package co.cask.cdap.logging.save;

import co.cask.cdap.logging.kafka.KafkaLogEvent;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import java.util.Iterator;
import javax.annotation.Nullable;

/* loaded from: input_file:co/cask/cdap/logging/save/AbstractKafkaLogProcessor.class */
public abstract class AbstractKafkaLogProcessor implements KafkaLogProcessor {
    private Checkpoint checkpoint;

    public void init(@Nullable Checkpoint checkpoint) {
        this.checkpoint = checkpoint;
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void process(Iterator<KafkaLogEvent> it) {
        PeekingIterator peekingIterator = Iterators.peekingIterator(it);
        while (peekingIterator.hasNext() && alreadyProcessed((KafkaLogEvent) peekingIterator.peek())) {
            peekingIterator.next();
        }
        if (peekingIterator.hasNext()) {
            doProcess(peekingIterator);
        }
    }

    protected abstract void doProcess(Iterator<KafkaLogEvent> it);

    public boolean alreadyProcessed(KafkaLogEvent kafkaLogEvent) {
        return this.checkpoint != null && kafkaLogEvent.getNextOffset() <= this.checkpoint.getNextOffset();
    }
}
