package co.cask.cdap.logging.pipeline.kafka;

import com.google.common.collect.ImmutableMap;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/pipeline/kafka/KafkaUtil.class */
public final class KafkaUtil {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class);

    public static long getOffsetByTimestamp(SimpleConsumer simpleConsumer, String str, int i, long j) throws KafkaException {
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest(ImmutableMap.of(new TopicAndPartition(str, i), new PartitionOffsetRequestInfo(j, 1)), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()));
        if (offsetsBefore.hasError()) {
            throw Errors.forCode(offsetsBefore.errorCode(str, i)).exception();
        }
        long[] offsets = offsetsBefore.offsets(str, i);
        if (offsets.length != 0) {
            LOG.debug("Offset {} fetched for {}:{} with timestamp {}.", new Object[]{Long.valueOf(offsets[0]), str, Integer.valueOf(i), Long.valueOf(j)});
            return offsets[0];
        }
        if (j != kafka.api.OffsetRequest.EarliestTime()) {
            return getOffsetByTimestamp(simpleConsumer, str, i, kafka.api.OffsetRequest.EarliestTime());
        }
        throw new UnknownServerException("Empty offsets received from offsets request on " + str + ":" + i + " from broker " + simpleConsumer.host() + ":" + simpleConsumer.port());
    }

    public static ByteBufferMessageSet fetchMessages(SimpleConsumer simpleConsumer, String str, int i, int i2, long j) throws KafkaException {
        FetchResponse fetch = simpleConsumer.fetch(new FetchRequestBuilder().clientId(simpleConsumer.clientId()).addFetch(str, i, j, i2).build());
        if (fetch.hasError()) {
            throw Errors.forCode(fetch.errorCode(str, i)).exception();
        }
        return fetch.messageSet(str, i);
    }

    private KafkaUtil() {
    }
}
