package co.cask.cdap.logging.kafka;

import co.cask.cdap.logging.LoggingConfiguration;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetOutOfRangeException;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/kafka/KafkaConsumer.class */
public final class KafkaConsumer implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    public static final long EARLIEST_OFFSET = -2;
    public static final long LATEST_OFFSET = -1;
    private static final int MAX_KAFKA_FETCH_RETRIES = 5;
    public static final int BUFFER_SIZE_BYTES = 1048576;
    public static final int TIMEOUT_MS = 3000;
    private final List<LoggingConfiguration.KafkaHost> replicaBrokers;
    private final String topic;
    private final int partition;
    private final int fetchTimeoutMs;
    private final String clientName;
    private volatile SimpleConsumer consumer;

    public KafkaConsumer(List<LoggingConfiguration.KafkaHost> list, String str, int i, int i2) {
        this.replicaBrokers = Lists.newArrayList(list);
        this.topic = str;
        this.partition = i;
        this.fetchTimeoutMs = i2;
        this.clientName = String.format("%s_%s_%d", getClass().getName(), str, Integer.valueOf(i));
    }

    public int fetchMessages(long j, Callback callback) throws OffsetOutOfRangeException {
        int i = 0;
        Iterator it = fetchMessageSet(j).iterator();
        while (it.hasNext()) {
            MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
            i++;
            callback.handle(messageAndOffset.offset(), messageAndOffset.message().payload());
        }
        return i;
    }

    public long fetchOffsetBefore(long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(this.topic, this.partition);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        OffsetRequest offsetRequest = new OffsetRequest(newHashMap, kafka.api.OffsetRequest.CurrentVersion(), this.clientName);
        if (this.consumer == null) {
            findLeader();
        }
        OffsetResponse offsetsBefore = this.consumer.getOffsetsBefore(offsetRequest);
        if (offsetsBefore.hasError()) {
            findLeader();
            offsetsBefore = this.consumer.getOffsetsBefore(offsetRequest);
            if (offsetsBefore.hasError()) {
                closeConsumer();
                String format = String.format("Error fetching offset data from broker %s:%d for topic %s, partition %d. Error code: %d", this.consumer.host(), Integer.valueOf(this.consumer.port()), this.topic, Integer.valueOf(this.partition), Short.valueOf(offsetsBefore.errorCode(this.topic, this.partition)));
                LOG.error(format);
                throw new RuntimeException(format);
            }
        }
        long[] offsets = offsetsBefore.offsets(this.topic, this.partition);
        if (offsets.length != 0) {
            return offsets[0];
        }
        if (j != -2) {
            return fetchOffsetBefore(-2L);
        }
        try {
            String format2 = String.format("Got zero offsets in offset response for time %s from broker %s:%d for topic %s, partiton %d", Long.valueOf(j), this.consumer.host(), Integer.valueOf(this.consumer.port()), this.topic, Integer.valueOf(this.partition));
            LOG.error(format2);
            throw new RuntimeException(format2);
        } catch (Throwable th) {
            closeConsumer();
            throw th;
        }
    }

    private void closeConsumer() {
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        closeConsumer();
    }

    private ByteBufferMessageSet fetchMessageSet(long j) throws OffsetOutOfRangeException {
        Preconditions.checkArgument(j >= 0, String.format("Illegal fetch offset %d", Long.valueOf(j)));
        short s = 0;
        for (int i = 0; i < MAX_KAFKA_FETCH_RETRIES; i++) {
            if (this.consumer == null) {
                findLeader();
            }
            FetchResponse fetch = this.consumer.fetch(new FetchRequestBuilder().clientId(this.clientName).addFetch(this.topic, this.partition, j, BUFFER_SIZE_BYTES).maxWait(this.fetchTimeoutMs).build());
            if (!fetch.hasError()) {
                return fetch.messageSet(this.topic, this.partition);
            }
            s = fetch.errorCode(this.topic, this.partition);
            LOG.warn(String.format("Error fetching data from broker %s:%d for topic %s, partition %d. Error code: %d", this.consumer.host(), Integer.valueOf(this.consumer.port()), this.topic, Integer.valueOf(this.partition), Short.valueOf(s)));
            if (s == ErrorMapping.OffsetOutOfRangeCode()) {
                throw new OffsetOutOfRangeException(String.format("Requested offset %d is out of range for topic %s partition %d", Long.valueOf(j), this.topic, Integer.valueOf(this.partition)));
            }
            findLeader();
        }
        String format = String.format("Error fetching data from broker %s:%d for topic %s, partition %d. Error code: %d", this.consumer.host(), Integer.valueOf(this.consumer.port()), this.topic, Integer.valueOf(this.partition), Short.valueOf(s));
        LOG.error(format);
        throw new RuntimeException(format);
    }

    private void saveReplicaBrokers(PartitionMetadata partitionMetadata) {
        if (partitionMetadata != null) {
            this.replicaBrokers.clear();
            for (Broker broker : partitionMetadata.replicas()) {
                this.replicaBrokers.add(new LoggingConfiguration.KafkaHost(broker.host(), broker.port()));
            }
        }
    }

    private PartitionMetadata fetchPartitonMetadata() {
        for (LoggingConfiguration.KafkaHost kafkaHost : this.replicaBrokers) {
            SimpleConsumer simpleConsumer = new SimpleConsumer(kafkaHost.getHostname(), kafkaHost.getPort(), TIMEOUT_MS, BUFFER_SIZE_BYTES, this.clientName);
            try {
                Iterator it = simpleConsumer.send(new TopicMetadataRequest(ImmutableList.of(this.topic))).topicsMetadata().iterator();
                while (it.hasNext()) {
                    for (PartitionMetadata partitionMetadata : ((TopicMetadata) it.next()).partitionsMetadata()) {
                        if (partitionMetadata.partitionId() == this.partition) {
                            return partitionMetadata;
                        }
                    }
                }
                simpleConsumer.close();
            } finally {
                simpleConsumer.close();
            }
        }
        return null;
    }

    private void findLeader() {
        closeConsumer();
        PartitionMetadata fetchPartitonMetadata = fetchPartitonMetadata();
        if (fetchPartitonMetadata == null) {
            String format = String.format("Could not find leader for topic %s, partition %d", this.topic, Integer.valueOf(this.partition));
            LOG.error(format);
            throw new RuntimeException(format);
        }
        if (fetchPartitonMetadata.leader() == null) {
            LOG.warn("Can't find leader for topic {} and partition {} with brokers {}.", new Object[]{this.topic, Integer.valueOf(this.partition), this.replicaBrokers, ErrorMapping.exceptionFor(fetchPartitonMetadata.errorCode())});
            throw new RuntimeException(ErrorMapping.exceptionFor(fetchPartitonMetadata.errorCode()));
        }
        this.consumer = new SimpleConsumer(fetchPartitonMetadata.leader().host(), fetchPartitonMetadata.leader().port(), TIMEOUT_MS, BUFFER_SIZE_BYTES, this.clientName);
        saveReplicaBrokers(fetchPartitonMetadata);
    }
}
