package org.apache.crunch.kafka.offset.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.crunch.kafka.offset.AbstractOffsetWriter;
import org.apache.crunch.kafka.offset.hdfs.Offsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.class */
public class HDFSOffsetWriter extends AbstractOffsetWriter {
    public static final String FILE_FORMAT_EXTENSION = ".json";
    private final Configuration config;
    private final Path baseStoragePath;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSOffsetWriter.class);
    public static final String PERSIST_TIME_FORMAT = "yyyy-MM-dd'T'HH-mm-ssZ";
    public static final DateTimeFormatter FILE_FORMATTER = DateTimeFormat.forPattern(PERSIST_TIME_FORMAT).withZoneUTC();
    private static final ObjectMapper MAPPER = new ObjectMapper();

    public HDFSOffsetWriter(Configuration configuration, Path path) {
        if (configuration == null) {
            throw new IllegalArgumentException("The 'config' cannot be 'null'.");
        }
        if (path == null) {
            throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'.");
        }
        this.config = configuration;
        this.baseStoragePath = path;
    }

    @Override // org.apache.crunch.kafka.offset.OffsetWriter
    public void write(long j, Map<TopicPartition, Long> map) throws IOException {
        if (map == null) {
            throw new IllegalArgumentException("The 'offsets' cannot be 'null'.");
        }
        if (j < 0) {
            throw new IllegalArgumentException("The 'asOfTime' cannot be less than 0.");
        }
        LinkedList linkedList = new LinkedList();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            linkedList.add(Offsets.PartitionOffset.Builder.newBuilder().setOffset(entry.getValue().longValue()).setTopic(entry.getKey().topic()).setPartition(entry.getKey().partition()).build());
        }
        Offsets build = Offsets.Builder.newBuilder().setOffsets(linkedList).setAsOfTime(j).build();
        FileSystem fileSystem = getFileSystem();
        Path persistedTimeStoragePath = getPersistedTimeStoragePath(this.baseStoragePath, j);
        LOG.debug("Writing offsets to {} with as of time {}", persistedTimeStoragePath, Long.valueOf(j));
        FSDataOutputStream create = fileSystem.create(getPersistedTimeStoragePath(this.baseStoragePath, j), true);
        Throwable th = null;
        try {
            try {
                MAPPER.writeValue(create, build);
                create.flush();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                LOG.debug("Completed writing offsets to {}", persistedTimeStoragePath);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    protected FileSystem getFileSystem() throws IOException {
        return FileSystem.get(this.config);
    }

    public static Path getPersistedTimeStoragePath(Path path, long j) {
        if (path == null) {
            throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'.");
        }
        return new Path(path, persistenceTimeToFileName(j));
    }

    public static long fileNameToPersistenceTime(String str) {
        if (StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("the 'fileName' cannot be 'null' or empty");
        }
        return FILE_FORMATTER.parseDateTime(StringUtils.strip(str, FILE_FORMAT_EXTENSION)).getMillis();
    }

    public static String persistenceTimeToFileName(long j) {
        return FILE_FORMATTER.print(new DateTime(j, DateTimeZone.UTC)) + FILE_FORMAT_EXTENSION;
    }
}
