package co.cask.cdap.logging.save;

import co.cask.cdap.logging.kafka.KafkaLogEvent;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:co/cask/cdap/logging/save/AbstractKafkaLogProcessor.class */
public abstract class AbstractKafkaLogProcessor implements KafkaLogProcessor {
    private final Map<Integer, Checkpoint> partitonCheckpoints = Maps.newHashMap();

    public void init(Set<Integer> set, CheckpointManager checkpointManager) {
        this.partitonCheckpoints.clear();
        try {
            for (Integer num : set) {
                this.partitonCheckpoints.put(num, checkpointManager.getCheckpoint(num.intValue()));
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void process(KafkaLogEvent kafkaLogEvent) {
        if (alreadyProcessed(kafkaLogEvent)) {
            return;
        }
        doProcess(kafkaLogEvent);
    }

    protected abstract void doProcess(KafkaLogEvent kafkaLogEvent);

    public boolean alreadyProcessed(KafkaLogEvent kafkaLogEvent) {
        Checkpoint checkpoint = this.partitonCheckpoints.get(Integer.valueOf(kafkaLogEvent.getPartition()));
        return checkpoint != null && kafkaLogEvent.getNextOffset() <= checkpoint.getNextOffset();
    }
}
