package org.apache.tephra.persist;

import com.google.common.annotations.VisibleForTesting;
import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
import org.apache.tephra.metrics.MetricsCollector;
import org.apache.tephra.persist.AbstractTransactionLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tephra/persist/HDFSTransactionLog.class */
public class HDFSTransactionLog extends AbstractTransactionLog {
    private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class);
    private final FileSystem fs;
    private final Configuration hConf;
    private final Path logPath;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/tephra/persist/HDFSTransactionLog$LogWriter.class */
    static final class LogWriter implements TransactionLogWriter {
        private final SequenceFile.Writer internalWriter;

        LogWriter(FileSystem fileSystem, Configuration configuration, Path path) throws IOException {
            SequenceFile.Metadata metadata = new SequenceFile.Metadata();
            metadata.set(new Text("version"), new Text(Byte.toString((byte) 3)));
            this.internalWriter = SequenceFile.createWriter(fileSystem, configuration, path, LongWritable.class, TransactionEdit.class, SequenceFile.CompressionType.NONE, (CompressionCodec) null, (Progressable) null, metadata);
            HDFSTransactionLog.LOG.debug("Created a new TransactionLog writer for " + path);
        }

        @Override // org.apache.tephra.persist.TransactionLogWriter
        public long getPosition() throws IOException {
            return this.internalWriter.getLength();
        }

        @Override // org.apache.tephra.persist.TransactionLogWriter
        public void append(AbstractTransactionLog.Entry entry) throws IOException {
            this.internalWriter.append(entry.getKey(), entry.getEdit());
        }

        @Override // org.apache.tephra.persist.TransactionLogWriter
        public void commitMarker(int i) throws IOException {
            CommitMarkerCodec.writeMarker(this.internalWriter, i);
        }

        @Override // org.apache.tephra.persist.TransactionLogWriter
        public void sync() throws IOException {
            this.internalWriter.syncFs();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.internalWriter.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HDFSTransactionLog(FileSystem fileSystem, Configuration configuration, Path path, long j, MetricsCollector metricsCollector) {
        super(j, metricsCollector, configuration);
        this.fs = fileSystem;
        this.hConf = configuration;
        this.logPath = path;
    }

    @Override // org.apache.tephra.persist.AbstractTransactionLog
    protected TransactionLogWriter createWriter() throws IOException {
        return new LogWriter(this.fs, this.hConf, this.logPath);
    }

    @Override // org.apache.tephra.persist.AbstractTransactionLog, org.apache.tephra.persist.TransactionLog
    public String getName() {
        return this.logPath.getName();
    }

    @Override // org.apache.tephra.persist.AbstractTransactionLog, org.apache.tephra.persist.TransactionLog
    public TransactionLogReader getReader() throws IOException {
        long len = this.fs.getFileStatus(this.logPath).getLen();
        if (len <= 0) {
            LOG.warn("File " + this.logPath + " might be still open, length is 0");
        }
        new HDFSUtil().recoverFileLease(this.fs, this.logPath, this.hConf);
        try {
            LOG.info("New file size for " + this.logPath + " is " + this.fs.getFileStatus(this.logPath).getLen());
            return new HDFSTransactionLogReaderSupplier(new SequenceFile.Reader(this.fs, this.logPath, this.hConf)).m242get();
        } catch (EOFException e) {
            if (len > 0) {
                return null;
            }
            LOG.warn("Could not open " + this.logPath + " for reading. File is empty", e);
            return null;
        }
    }
}
