package io.confluent.kafkarest;

import io.confluent.kafka.api.PartitionFetchInfo;
import io.confluent.kafka.cluster.Broker;
import io.confluent.kafka.cluster.EndPoint;
import io.confluent.kafka.common.TopicAndPartition;
import io.confluent.kafka.javaapi.FetchRequest;
import io.confluent.kafka.javaapi.FetchResponse;
import io.confluent.kafka.javaapi.message.ByteBufferMessageSet;
import io.confluent.kafka.message.MessageAndMetadata;
import io.confluent.kafka.message.MessageAndOffset;
import io.confluent.kafka.serializer.Decoder;
import io.confluent.kafka.serializer.DefaultDecoder;
import io.confluent.kafka.shaded.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.shaded.serializers.KafkaAvroDecoder;
import io.confluent.kafka.shaded.serializers.KafkaJsonDecoder;
import io.confluent.kafka.utils.VerifiableProperties;
import io.confluent.kafkarest.converters.AvroConverter;
import io.confluent.kafkarest.entities.AvroConsumerRecord;
import io.confluent.kafkarest.entities.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.JsonConsumerRecord;
import io.confluent.org.apache.kafka.common.record.TimestampType;
import io.confluent.org.apache.kafka.common.security.auth.SecurityProtocol;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:io/confluent/kafkarest/SimpleConsumerManager.class */
public class SimpleConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumerManager.class);
    private final int maxPoolSize;
    private final int poolInstanceAvailabilityTimeoutMs;
    private final Time time;
    private final MetadataObserver mdObserver;
    private final SimpleConsumerFactory simpleConsumerFactory;
    private final Decoder<Object> avroDecoder;
    private final Decoder<byte[]> binaryDecoder;
    private final Decoder<Object> jsonDecoder;
    private AtomicInteger correlationId = new AtomicInteger(0);
    private final ConcurrentMap<Broker, SimpleConsumerPool> simpleConsumersPools = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.kafkarest.SimpleConsumerManager$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/kafkarest/SimpleConsumerManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$kafkarest$entities$EmbeddedFormat = new int[EmbeddedFormat.values().length];

        static {
            try {
                $SwitchMap$io$confluent$kafkarest$entities$EmbeddedFormat[EmbeddedFormat.BINARY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$kafkarest$entities$EmbeddedFormat[EmbeddedFormat.AVRO.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$confluent$kafkarest$entities$EmbeddedFormat[EmbeddedFormat.JSON.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public SimpleConsumerManager(KafkaRestConfig kafkaRestConfig, MetadataObserver metadataObserver, SimpleConsumerFactory simpleConsumerFactory) {
        this.mdObserver = metadataObserver;
        this.simpleConsumerFactory = simpleConsumerFactory;
        this.maxPoolSize = kafkaRestConfig.getInt("simpleconsumer.pool.size.max").intValue();
        this.poolInstanceAvailabilityTimeoutMs = kafkaRestConfig.getInt("simpleconsumer.pool.timeout.ms").intValue();
        this.time = kafkaRestConfig.getTime();
        Properties properties = new Properties();
        properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaRestConfig.getString(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG));
        this.avroDecoder = new KafkaAvroDecoder(new VerifiableProperties(properties));
        this.binaryDecoder = new DefaultDecoder(new VerifiableProperties());
        this.jsonDecoder = new KafkaJsonDecoder(new VerifiableProperties());
    }

    private SimpleConsumerPool createSimpleConsumerPool() {
        return new SimpleConsumerPool(this.maxPoolSize, this.poolInstanceAvailabilityTimeoutMs, this.time, this.simpleConsumerFactory);
    }

    private SimpleFetcher getSimpleFetcher(Broker broker) {
        SimpleConsumerPool simpleConsumerPool = this.simpleConsumersPools.get(broker);
        if (simpleConsumerPool == null) {
            this.simpleConsumersPools.putIfAbsent(broker, createSimpleConsumerPool());
            simpleConsumerPool = this.simpleConsumersPools.get(broker);
        }
        for (EndPoint endPoint : JavaConversions.asJavaCollection(broker.endPoints())) {
            if (endPoint.securityProtocol() == SecurityProtocol.PLAINTEXT) {
                return simpleConsumerPool.get(endPoint.host(), endPoint.port());
            }
        }
        throw Errors.noSslSupportException();
    }

    public void consume(String str, int i, long j, long j2, EmbeddedFormat embeddedFormat, ConsumerReadCallback consumerReadCallback) {
        ArrayList arrayList = null;
        Exception exc = null;
        SimpleFetcher simpleFetcher = null;
        try {
            try {
                simpleFetcher = getSimpleFetcher(this.mdObserver.getLeader(str, i));
                arrayList = new ArrayList();
                int i2 = 0;
                while (j2 > 0) {
                    i2++;
                    log.debug("Simple consumer " + simpleFetcher.clientId() + ": fetch " + i2 + "; " + j2 + " messages remaining");
                    ByteBufferMessageSet fetchRecords = fetchRecords(str, i, j, simpleFetcher);
                    if (!fetchRecords.iterator().hasNext()) {
                        break;
                    }
                    Iterator<MessageAndOffset> it = fetchRecords.iterator();
                    while (it.hasNext()) {
                        MessageAndOffset next = it.next();
                        if (next.offset() >= j) {
                            arrayList.add(createConsumerRecord(next, str, i, embeddedFormat));
                            j2--;
                            j++;
                            if (j2 == 0) {
                                break;
                            }
                        }
                    }
                }
                if (simpleFetcher != null) {
                    try {
                        simpleFetcher.close();
                    } catch (Exception e) {
                        log.error("Unable to release SimpleConsumer {} into the pool", simpleFetcher.clientId(), e);
                    }
                }
            } catch (Throwable th) {
                if (simpleFetcher != null) {
                    try {
                        simpleFetcher.close();
                    } catch (Exception e2) {
                        log.error("Unable to release SimpleConsumer {} into the pool", simpleFetcher.clientId(), e2);
                    }
                }
                throw th;
            }
        } catch (Throwable th2) {
            exc = th2 instanceof RestException ? th2 : Errors.kafkaErrorException(th2);
            if (simpleFetcher != null) {
                try {
                    simpleFetcher.close();
                } catch (Exception e3) {
                    log.error("Unable to release SimpleConsumer {} into the pool", simpleFetcher.clientId(), e3);
                }
            }
        }
        consumerReadCallback.onCompletion(arrayList, exc);
    }

    private BinaryConsumerRecord createBinaryConsumerRecord(MessageAndOffset messageAndOffset, String str, int i) {
        MessageAndMetadata messageAndMetadata = new MessageAndMetadata(str, i, messageAndOffset.message(), messageAndOffset.offset(), this.binaryDecoder, this.binaryDecoder, 0L, TimestampType.CREATE_TIME);
        return new BinaryConsumerRecord(str, (byte[]) messageAndMetadata.key(), (byte[]) messageAndMetadata.message(), i, messageAndOffset.offset());
    }

    private AvroConsumerRecord createAvroConsumerRecord(MessageAndOffset messageAndOffset, String str, int i) {
        MessageAndMetadata messageAndMetadata = new MessageAndMetadata(str, i, messageAndOffset.message(), messageAndOffset.offset(), this.avroDecoder, this.avroDecoder, 0L, TimestampType.CREATE_TIME);
        return new AvroConsumerRecord(str, AvroConverter.toJson(messageAndMetadata.key()).json, AvroConverter.toJson(messageAndMetadata.message()).json, i, messageAndOffset.offset());
    }

    private JsonConsumerRecord createJsonConsumerRecord(MessageAndOffset messageAndOffset, String str, int i) {
        MessageAndMetadata messageAndMetadata = new MessageAndMetadata(str, i, messageAndOffset.message(), messageAndOffset.offset(), this.jsonDecoder, this.jsonDecoder, 0L, TimestampType.CREATE_TIME);
        return new JsonConsumerRecord(str, messageAndMetadata.key(), messageAndMetadata.message(), i, messageAndOffset.offset());
    }

    private ConsumerRecord createConsumerRecord(MessageAndOffset messageAndOffset, String str, int i, EmbeddedFormat embeddedFormat) {
        switch (AnonymousClass1.$SwitchMap$io$confluent$kafkarest$entities$EmbeddedFormat[embeddedFormat.ordinal()]) {
            case 1:
                return createBinaryConsumerRecord(messageAndOffset, str, i);
            case 2:
                return createAvroConsumerRecord(messageAndOffset, str, i);
            case 3:
                return createJsonConsumerRecord(messageAndOffset, str, i);
            default:
                throw new RestServerErrorException("Invalid embedded format for new consumer.", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
        }
    }

    private ByteBufferMessageSet fetchRecords(String str, int i, long j, SimpleFetcher simpleFetcher) {
        SimpleConsumerConfig simpleConsumerConfig = this.simpleConsumerFactory.getSimpleConsumerConfig();
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicAndPartition(str, i), new PartitionFetchInfo(j, simpleConsumerConfig.fetchMessageMaxBytes()));
        FetchResponse fetch = simpleFetcher.fetch(new FetchRequest(this.correlationId.incrementAndGet(), simpleFetcher.clientId(), simpleConsumerConfig.fetchWaitMaxMs(), simpleConsumerConfig.fetchMinBytes(), hashMap));
        if (!fetch.hasError()) {
            return fetch.messageSet(str, i);
        }
        throw Errors.kafkaErrorException(new Exception("Fetch response contains an error code: " + ((int) fetch.errorCode(str, i))));
    }

    public void shutdown() {
        Iterator<SimpleConsumerPool> it = this.simpleConsumersPools.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }
}
