package org.apache.crunch.kafka.offset.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.crunch.kafka.offset.AbstractOffsetReader;
import org.apache.crunch.kafka.offset.hdfs.Offsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.class */
public class HDFSOffsetReader extends AbstractOffsetReader {
    private final Configuration config;
    private final Path baseOffsetStoragePath;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSOffsetReader.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();

    public HDFSOffsetReader(Configuration configuration, Path path) {
        if (configuration == null) {
            throw new IllegalArgumentException("The 'config' cannot be 'null'.");
        }
        if (path == null) {
            throw new IllegalArgumentException("The 'baseOffsetStoragePath' cannot be 'null'.");
        }
        this.config = configuration;
        this.baseOffsetStoragePath = path;
    }

    @Override // org.apache.crunch.kafka.offset.OffsetReader
    public Map<TopicPartition, Long> readLatestOffsets() throws IOException {
        Map<TopicPartition, Long> readOffsets;
        List<Long> storedOffsetPersistenceTimes = getStoredOffsetPersistenceTimes(true);
        if (!storedOffsetPersistenceTimes.isEmpty() && (readOffsets = readOffsets(storedOffsetPersistenceTimes.get(0).longValue())) != null) {
            return readOffsets;
        }
        return Collections.emptyMap();
    }

    @Override // org.apache.crunch.kafka.offset.AbstractOffsetReader, org.apache.crunch.kafka.offset.OffsetReader
    public Map<TopicPartition, Long> readOffsets(long j) throws IOException {
        Path persistedTimeStoragePath = HDFSOffsetWriter.getPersistedTimeStoragePath(this.baseOffsetStoragePath, j);
        FileSystem fileSystem = getFileSystem();
        if (!fileSystem.isFile(persistedTimeStoragePath)) {
            LOG.error("Offset file at {} is not a file or does not exist.", persistedTimeStoragePath);
            return null;
        }
        FSDataInputStream open = fileSystem.open(persistedTimeStoragePath);
        try {
            Offsets offsets = (Offsets) MAPPER.readValue(open, Offsets.class);
            HashMap hashMap = new HashMap();
            for (Offsets.PartitionOffset partitionOffset : offsets.getOffsets()) {
                hashMap.put(new TopicPartition(partitionOffset.getTopic(), partitionOffset.getPartition()), Long.valueOf(partitionOffset.getOffset()));
            }
            return hashMap;
        } finally {
            open.close();
        }
    }

    @Override // org.apache.crunch.kafka.offset.AbstractOffsetReader, org.apache.crunch.kafka.offset.OffsetReader
    public List<Long> getStoredOffsetPersistenceTimes() throws IOException {
        return getStoredOffsetPersistenceTimes(false);
    }

    private List<Long> getStoredOffsetPersistenceTimes(boolean z) throws IOException {
        LinkedList linkedList = new LinkedList();
        try {
            for (FileStatus fileStatus : getFileSystem().listStatus(this.baseOffsetStoragePath)) {
                if (fileStatus.isFile()) {
                    try {
                        linkedList.add(Long.valueOf(HDFSOffsetWriter.fileNameToPersistenceTime(fileStatus.getPath().getName())));
                    } catch (IllegalArgumentException e) {
                        LOG.info("Skipping file {} due to filename not being of the correct format.", fileStatus.getPath(), e);
                    }
                } else {
                    LOG.info("Skippping {} because it is not a file.", fileStatus.getPath());
                }
            }
        } catch (FileNotFoundException e2) {
            LOG.error("Unable to retrieve prior offsets.", e2);
        }
        if (z) {
            Collections.sort(linkedList, Collections.reverseOrder());
        } else {
            Collections.sort(linkedList);
        }
        return Collections.unmodifiableList(linkedList);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    protected FileSystem getFileSystem() throws IOException {
        return FileSystem.get(this.config);
    }
}
