package org.apache.pinot.core.realtime.kafka;

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.PartitionMetadata;
import kafka.api.TopicMetadata;
import kafka.cluster.BrokerEndPoint;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import org.apache.kafka.common.protocol.Errors;
import org.apache.pinot.core.realtime.impl.kafka.KafkaPartitionLevelConsumer;
import org.apache.pinot.core.realtime.impl.kafka.KafkaSimpleConsumerFactory;
import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamMetadataProvider;
import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.Traversable;

/* loaded from: input_file:org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.class */
public class KafkaPartitionLevelConsumerTest {

    /* loaded from: input_file:org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest$MockKafkaSimpleConsumerFactory.class */
    public class MockKafkaSimpleConsumerFactory implements KafkaSimpleConsumerFactory {
        private String[] hosts;
        private int[] ports;
        private int[] partitionLeaderIndices;
        private int brokerCount;
        private int partitionCount;
        private String topicName;
        private BrokerEndPoint[] brokerArray;

        /* loaded from: input_file:org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest$MockKafkaSimpleConsumerFactory$MockFetchResponse.class */
        private class MockFetchResponse extends FetchResponse {
            Map<TopicAndPartition, Short> errorMap;

            public MockFetchResponse(Map<TopicAndPartition, Short> map) {
                super((kafka.api.FetchResponse) null);
                this.errorMap = map;
            }

            public ByteBufferMessageSet messageSet(String str, int i) {
                if (this.errorMap.containsKey(new TopicAndPartition(str, i))) {
                    throw new IllegalArgumentException();
                }
                return new ByteBufferMessageSet(Collections.emptyList());
            }

            public short errorCode(String str, int i) {
                TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
                return this.errorMap.containsKey(topicAndPartition) ? this.errorMap.get(topicAndPartition).shortValue() : Errors.NONE.code();
            }

            public long highWatermark(String str, int i) {
                return 0L;
            }

            public boolean hasError() {
                return !this.errorMap.isEmpty();
            }
        }

        /* loaded from: input_file:org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest$MockKafkaSimpleConsumerFactory$MockSimpleConsumer.class */
        private class MockSimpleConsumer extends SimpleConsumer {
            private int index;

            public MockSimpleConsumer(String str, int i, int i2, int i3, String str2, int i4) {
                super(str, i, i2, i3, str2);
                this.index = i4;
            }

            public FetchResponse fetch(FetchRequest fetchRequest) {
                HashMap hashMap = new HashMap();
                for (Traversable requestInfo = fetchRequest.requestInfo(); requestInfo.headOption().isDefined(); requestInfo = (Traversable) requestInfo.tail()) {
                    Tuple2 tuple2 = (Tuple2) requestInfo.head();
                    TopicAndPartition topicAndPartition = (TopicAndPartition) tuple2._1();
                    if (!topicAndPartition.topic().equals(MockKafkaSimpleConsumerFactory.this.topicName)) {
                        hashMap.put(topicAndPartition, Short.valueOf(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
                    } else if (MockKafkaSimpleConsumerFactory.this.partitionLeaderIndices.length < topicAndPartition.partition()) {
                        hashMap.put(topicAndPartition, Short.valueOf(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
                    } else if (MockKafkaSimpleConsumerFactory.this.partitionLeaderIndices[topicAndPartition.partition()] != this.index) {
                        hashMap.put(topicAndPartition, Short.valueOf(Errors.NOT_LEADER_FOR_PARTITION.code()));
                    }
                }
                return new MockFetchResponse(hashMap);
            }

            public FetchResponse fetch(kafka.javaapi.FetchRequest fetchRequest) {
                throw new RuntimeException("Unimplemented");
            }

            public OffsetResponse getOffsetsBefore(OffsetRequest offsetRequest) {
                throw new RuntimeException("Unimplemented!");
            }

            public TopicMetadataResponse send(TopicMetadataRequest topicMetadataRequest) {
                List list = topicMetadataRequest.topics();
                TopicMetadata[] topicMetadataArr = new TopicMetadata[list.size()];
                for (int i = 0; i < topicMetadataArr.length; i++) {
                    String str = (String) list.get(i);
                    if (str.equals(MockKafkaSimpleConsumerFactory.this.topicName)) {
                        PartitionMetadata[] partitionMetadataArr = new PartitionMetadata[MockKafkaSimpleConsumerFactory.this.partitionCount];
                        for (int i2 = 0; i2 < MockKafkaSimpleConsumerFactory.this.partitionCount; i2++) {
                            scala.collection.immutable.List list2 = JavaConversions.asScalaBuffer(Collections.emptyList()).toList();
                            partitionMetadataArr[i2] = new PartitionMetadata(i2, Some.apply(MockKafkaSimpleConsumerFactory.this.brokerArray[MockKafkaSimpleConsumerFactory.this.partitionLeaderIndices[i2]]), list2, list2, Errors.NONE.code());
                        }
                        topicMetadataArr[i] = new TopicMetadata(str, scala.collection.immutable.List.fromArray(partitionMetadataArr), Errors.NONE.code());
                    } else {
                        topicMetadataArr[i] = new TopicMetadata(str, (Seq) null, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
                    }
                }
                return new TopicMetadataResponse(new kafka.api.TopicMetadataResponse(scala.collection.immutable.List.fromArray(MockKafkaSimpleConsumerFactory.this.brokerArray), scala.collection.immutable.List.fromArray(topicMetadataArr), -1));
            }
        }

        public MockKafkaSimpleConsumerFactory(String[] strArr, int[] iArr, long[] jArr, long[] jArr2, int[] iArr2, String str) {
            Preconditions.checkArgument(strArr.length == iArr.length);
            this.hosts = strArr;
            this.ports = iArr;
            this.brokerCount = strArr.length;
            this.brokerArray = new BrokerEndPoint[this.brokerCount];
            for (int i = 0; i < this.brokerCount; i++) {
                this.brokerArray[i] = new BrokerEndPoint(i, strArr[i], iArr[i]);
            }
            Preconditions.checkArgument(jArr.length == jArr2.length);
            Preconditions.checkArgument(jArr.length == iArr2.length);
            this.partitionLeaderIndices = iArr2;
            this.partitionCount = jArr.length;
            this.topicName = str;
        }

        public SimpleConsumer buildSimpleConsumer(String str, int i, int i2, int i3, String str2) {
            for (int i4 = 0; i4 < this.brokerCount; i4++) {
                if (this.hosts[i4].equalsIgnoreCase(str) && this.ports[i4] == i) {
                    return new MockSimpleConsumer(str, i, i2, i3, str2, i4);
                }
            }
            throw new RuntimeException("No such host/port");
        }
    }

    @Test
    public void testBuildConsumer() throws Exception {
        MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new MockKafkaSimpleConsumerFactory(new String[]{"abcd", "bcde"}, new int[]{1234, 2345}, new long[]{12345, 23456}, new long[]{23456, 34567}, new int[]{0, 1}, "theTopic");
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", "abcd:1234,bcde:2345");
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", mockKafkaSimpleConsumerFactory.getClass().getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        StreamConfig streamConfig = new StreamConfig(hashMap);
        new KafkaStreamMetadataProvider("clientId", streamConfig, mockKafkaSimpleConsumerFactory);
        KafkaPartitionLevelConsumer kafkaPartitionLevelConsumer = new KafkaPartitionLevelConsumer("clientId", streamConfig, 0, mockKafkaSimpleConsumerFactory);
        kafkaPartitionLevelConsumer.fetchMessages(12345L, 23456L, 10000);
        Assert.assertEquals(512000, kafkaPartitionLevelConsumer.getSimpleConsumer().bufferSize());
        Assert.assertEquals(10000, kafkaPartitionLevelConsumer.getSimpleConsumer().soTimeout());
        hashMap.put("stream.kafka.buffer.size", "100");
        hashMap.put("stream.kafka.socket.timeout", "1000");
        KafkaPartitionLevelConsumer kafkaPartitionLevelConsumer2 = new KafkaPartitionLevelConsumer("clientId", new StreamConfig(hashMap), 0, mockKafkaSimpleConsumerFactory);
        kafkaPartitionLevelConsumer2.fetchMessages(12345L, 23456L, 10000);
        Assert.assertEquals(100, kafkaPartitionLevelConsumer2.getSimpleConsumer().bufferSize());
        Assert.assertEquals(1000, kafkaPartitionLevelConsumer2.getSimpleConsumer().soTimeout());
    }

    @Test
    public void testGetPartitionCount() {
        MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new MockKafkaSimpleConsumerFactory(new String[]{"abcd", "bcde"}, new int[]{1234, 2345}, new long[]{12345, 23456}, new long[]{23456, 34567}, new int[]{0, 1}, "theTopic");
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", "abcd:1234,bcde:2345");
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", mockKafkaSimpleConsumerFactory.getClass().getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        Assert.assertEquals(new KafkaStreamMetadataProvider("clientId", new StreamConfig(hashMap), mockKafkaSimpleConsumerFactory).fetchPartitionCount(10000L), 2);
    }

    @Test
    public void testFetchMessages() throws Exception {
        MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new MockKafkaSimpleConsumerFactory(new String[]{"abcd", "bcde"}, new int[]{1234, 2345}, new long[]{12345, 23456}, new long[]{23456, 34567}, new int[]{0, 1}, "theTopic");
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", "abcd:1234,bcde:2345");
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", mockKafkaSimpleConsumerFactory.getClass().getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        new KafkaPartitionLevelConsumer("clientId", new StreamConfig(hashMap), 0, mockKafkaSimpleConsumerFactory).fetchMessages(12345L, 23456L, 10000);
    }

    @Test(enabled = false)
    public void testFetchOffsets() throws Exception {
        MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new MockKafkaSimpleConsumerFactory(new String[]{"abcd", "bcde"}, new int[]{1234, 2345}, new long[]{12345, 23456}, new long[]{23456, 34567}, new int[]{0, 1}, "theTopic");
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", "abcd:1234,bcde:2345");
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", mockKafkaSimpleConsumerFactory.getClass().getName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        new KafkaStreamMetadataProvider("clientId", new StreamConfig(hashMap), 0, mockKafkaSimpleConsumerFactory).fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 10000L);
    }
}
