package co.cask.cdap.logging.read;

import ch.qos.logback.classic.spi.ILoggingEvent;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.appender.kafka.KafkaTopic;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.appender.kafka.StringPartitioner;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.logging.filter.AndFilter;
import co.cask.cdap.logging.filter.Filter;
import co.cask.cdap.logging.kafka.KafkaConsumer;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/read/KafkaLogReader.class */
public class KafkaLogReader implements LogReader {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogReader.class);
    private static final int KAFKA_FETCH_TIMEOUT_MS = 30000;
    private final List<LoggingConfiguration.KafkaHost> seedBrokers;
    private final String topic;
    private final LoggingEventSerializer serializer;
    private final StringPartitioner partitioner;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/logging/read/KafkaLogReader$KafkaCallback.class */
    public static class KafkaCallback implements co.cask.cdap.logging.kafka.Callback {
        private final Filter logFilter;
        private final LoggingEventSerializer serializer;
        private final long stopOffset;
        private final int maxEvents;
        private final Callback callback;
        private final long fromTimeMs;
        private LogOffset firstOffset;
        private LogOffset lastOffset;
        private int count;

        private KafkaCallback(Filter filter, LoggingEventSerializer loggingEventSerializer, long j, int i, Callback callback, long j2) {
            this.count = 0;
            this.logFilter = filter;
            this.serializer = loggingEventSerializer;
            this.stopOffset = j;
            this.maxEvents = i;
            this.callback = callback;
            this.fromTimeMs = j2;
        }

        @Override // co.cask.cdap.logging.kafka.Callback
        public void handle(long j, ByteBuffer byteBuffer) {
            ILoggingEvent fromBytes = this.serializer.fromBytes(byteBuffer);
            LogOffset logOffset = new LogOffset(j, fromBytes.getTimeStamp());
            if (j < this.stopOffset && this.count < this.maxEvents && this.logFilter.match(fromBytes) && fromBytes.getTimeStamp() > this.fromTimeMs) {
                this.count++;
                this.callback.handle(new LogEvent(fromBytes, logOffset));
            }
            if (this.firstOffset == null) {
                this.firstOffset = logOffset;
            }
            this.lastOffset = logOffset;
        }

        public LogOffset getFirstOffset() {
            return this.firstOffset;
        }

        public LogOffset getLastOffset() {
            return this.lastOffset;
        }

        public int getCount() {
            return this.count;
        }
    }

    @Inject
    public KafkaLogReader(CConfiguration cConfiguration, StringPartitioner stringPartitioner) {
        try {
            this.seedBrokers = LoggingConfiguration.getKafkaSeedBrokers(cConfiguration.get(LoggingConfiguration.KAFKA_SEED_BROKERS));
            Preconditions.checkArgument(!this.seedBrokers.isEmpty(), "Kafka seed brokers list is empty!");
            this.topic = KafkaTopic.getTopic();
            Preconditions.checkArgument(!this.topic.isEmpty(), "Kafka topic is emtpty!");
            this.partitioner = stringPartitioner;
            this.serializer = new LoggingEventSerializer();
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogNext(LoggingContext loggingContext, ReadRange readRange, int i, Filter filter, Callback callback) {
        if (readRange.getKafkaOffset() == ReadRange.LATEST.getKafkaOffset()) {
            getLogPrev(loggingContext, readRange, i, filter, callback);
            return;
        }
        int partition = this.partitioner.partition(loggingContext.getLogPartition(), -1);
        callback.init();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.seedBrokers, this.topic, partition, KAFKA_FETCH_TIMEOUT_MS);
        try {
            try {
                if (readRange.getKafkaOffset() == LogOffset.INVALID_KAFKA_OFFSET) {
                    readRange = new ReadRange(readRange.getFromMillis(), readRange.getToMillis(), kafkaConsumer.fetchOffsetBefore(readRange.getFromMillis()));
                }
                AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                long fetchOffsetBefore = kafkaConsumer.fetchOffsetBefore(-1L);
                long kafkaOffset = readRange.getKafkaOffset() + 1;
                if (kafkaOffset >= fetchOffsetBefore) {
                    try {
                        kafkaConsumer.close();
                        return;
                    } catch (IOException e) {
                        LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", this.topic, Integer.valueOf(partition)), e);
                        return;
                    }
                }
                fetchLogEvents(kafkaConsumer, andFilter, kafkaOffset, fetchOffsetBefore, i, callback, readRange);
                try {
                    kafkaConsumer.close();
                } catch (IOException e2) {
                    LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", this.topic, Integer.valueOf(partition)), e2);
                }
            } catch (Throwable th) {
                try {
                    kafkaConsumer.close();
                } catch (IOException e3) {
                    LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", this.topic, Integer.valueOf(partition)), e3);
                }
                throw th;
            }
        } catch (Throwable th2) {
            LOG.error("Got exception: ", th2);
            throw Throwables.propagate(th2);
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogPrev(LoggingContext loggingContext, ReadRange readRange, int i, Filter filter, Callback callback) {
        if (readRange.getKafkaOffset() == LogOffset.INVALID_KAFKA_OFFSET) {
            readRange = new ReadRange(readRange.getFromMillis(), readRange.getToMillis(), ReadRange.LATEST.getKafkaOffset());
        }
        int partition = this.partitioner.partition(loggingContext.getLogPartition(), -1);
        callback.init();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.seedBrokers, this.topic, partition, KAFKA_FETCH_TIMEOUT_MS);
        try {
            try {
                AndFilter andFilter = new AndFilter(ImmutableList.of(LoggingContextHelper.createFilter(loggingContext), filter));
                long fetchOffsetBefore = kafkaConsumer.fetchOffsetBefore(-1L);
                long fetchOffsetBefore2 = kafkaConsumer.fetchOffsetBefore(-2L);
                long kafkaOffset = readRange.getKafkaOffset() < 0 ? fetchOffsetBefore : readRange.getKafkaOffset();
                long j = kafkaOffset - i;
                if (j < fetchOffsetBefore2) {
                    j = fetchOffsetBefore2;
                }
                if (j >= kafkaOffset || j >= fetchOffsetBefore) {
                    try {
                        kafkaConsumer.close();
                        return;
                    } catch (IOException e) {
                        LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", this.topic, Integer.valueOf(partition)), e);
                        return;
                    }
                }
                int i2 = 0;
                while (i2 == 0) {
                    i2 = fetchLogEvents(kafkaConsumer, andFilter, j, kafkaOffset, i, callback, readRange);
                    kafkaOffset = j;
                    if (kafkaOffset <= fetchOffsetBefore2) {
                        break;
                    }
                    j = kafkaOffset - i;
                    if (j < fetchOffsetBefore2) {
                        j = fetchOffsetBefore2;
                    }
                }
                try {
                    kafkaConsumer.close();
                } catch (IOException e2) {
                    LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", this.topic, Integer.valueOf(partition)), e2);
                }
            } catch (Throwable th) {
                LOG.error("Got exception: ", th);
                throw Throwables.propagate(th);
            }
        } catch (Throwable th2) {
            try {
                kafkaConsumer.close();
            } catch (IOException e3) {
                LOG.error(String.format("Caught exception when closing KafkaConsumer for topic %s, partition %d", this.topic, Integer.valueOf(partition)), e3);
            }
            throw th2;
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLog(LoggingContext loggingContext, long j, long j2, Filter filter, Callback callback) {
        throw new UnsupportedOperationException("Getting logs by time is not supported by " + KafkaLogReader.class.getSimpleName());
    }

    private int fetchLogEvents(KafkaConsumer kafkaConsumer, Filter filter, long j, long j2, int i, Callback callback, ReadRange readRange) {
        KafkaCallback kafkaCallback = new KafkaCallback(filter, this.serializer, j2, i, callback, readRange.getFromMillis());
        while (kafkaCallback.getCount() < i && j < j2) {
            kafkaConsumer.fetchMessages(j, kafkaCallback);
            LogOffset lastOffset = kafkaCallback.getLastOffset();
            LogOffset firstOffset = kafkaCallback.getFirstOffset();
            if (lastOffset == null || firstOffset.getTime() < readRange.getFromMillis() || lastOffset.getTime() > readRange.getToMillis()) {
                break;
            }
            j = kafkaCallback.getLastOffset().getKafkaOffset() + 1;
        }
        return kafkaCallback.getCount();
    }
}
