package co.cask.cdap.logging.read;

import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.logging.appender.kafka.KafkaTopic;
import co.cask.cdap.logging.appender.kafka.StringPartitioner;
import co.cask.cdap.logging.filter.Filter;
import co.cask.cdap.logging.save.CheckpointManager;
import co.cask.cdap.logging.save.CheckpointManagerFactory;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/read/DistributedLogReader.class */
public final class DistributedLogReader implements LogReader {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedLogReader.class);
    private final KafkaLogReader kafkaLogReader;
    private final FileLogReader fileLogReader;
    private final CheckpointManager checkpointManager;
    private final StringPartitioner partitioner;

    @Inject
    public DistributedLogReader(KafkaLogReader kafkaLogReader, FileLogReader fileLogReader, CheckpointManagerFactory checkpointManagerFactory, StringPartitioner stringPartitioner) {
        this.kafkaLogReader = kafkaLogReader;
        this.fileLogReader = fileLogReader;
        this.checkpointManager = checkpointManagerFactory.create(KafkaTopic.getTopic(), 100);
        this.partitioner = stringPartitioner;
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogNext(LoggingContext loggingContext, ReadRange readRange, int i, Filter filter, Callback callback) {
        if (readRange != ReadRange.LATEST) {
            if (readRange.getFromMillis() < getCheckpointTime(loggingContext)) {
                this.fileLogReader.getLogNext(loggingContext, readRange, i, filter, callback);
                if (callback.getCount() != 0) {
                    LOG.trace("Got {} log entries from file", Integer.valueOf(callback.getCount()));
                    return;
                }
            }
        }
        this.kafkaLogReader.getLogNext(loggingContext, readRange, i, filter, callback);
        LOG.trace("Got {} log entries from kafka", Integer.valueOf(callback.getCount()));
        if (callback.getCount() == 0) {
            this.fileLogReader.getLogNext(loggingContext, readRange, i, filter, callback);
            LOG.trace("Got {} log entries from file", Integer.valueOf(callback.getCount()));
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLogPrev(LoggingContext loggingContext, ReadRange readRange, int i, Filter filter, Callback callback) {
        if (readRange != ReadRange.LATEST) {
            if (readRange.getToMillis() < getCheckpointTime(loggingContext)) {
                this.fileLogReader.getLogPrev(loggingContext, readRange, i, filter, callback);
                LOG.trace("Got {} log entries from file", Integer.valueOf(callback.getCount()));
                return;
            }
        }
        this.kafkaLogReader.getLogPrev(loggingContext, readRange, i, filter, callback);
        LOG.trace("Got {} log entries from kafka", Integer.valueOf(callback.getCount()));
        if (callback.getCount() == 0) {
            this.fileLogReader.getLogPrev(loggingContext, readRange, i, filter, callback);
            LOG.trace("Got {} log entries from file", Integer.valueOf(callback.getCount()));
        }
    }

    @Override // co.cask.cdap.logging.read.LogReader
    public void getLog(LoggingContext loggingContext, long j, long j2, Filter filter, Callback callback) {
        this.fileLogReader.getLog(loggingContext, j, j2, filter, callback);
    }

    private long getCheckpointTime(LoggingContext loggingContext) {
        try {
            return this.checkpointManager.getCheckpoint(this.partitioner.partition(loggingContext.getLogPartition(), -1)).getMaxEventTime();
        } catch (Exception e) {
            LOG.error("Got exception while reading checkpoint", e);
            return -1L;
        }
    }
}
