package org.apache.tephra.persist;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.tephra.TxConstants;
import org.apache.tephra.metrics.MetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tephra/persist/AbstractTransactionLog.class */
public abstract class AbstractTransactionLog implements TransactionLog {
    private static final long SLOW_APPEND_THRESHOLD = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
    private final MetricsCollector metricsCollector;
    protected long timestamp;
    private volatile boolean initialized;
    private volatile boolean closed;
    private TransactionLogWriter writer;
    private final AtomicLong logSequence = new AtomicLong();
    private AtomicLong syncedUpTo = new AtomicLong();
    private List<Entry> pendingWrites = Lists.newLinkedList();

    @VisibleForTesting
    @Deprecated
    /* loaded from: input_file:org/apache/tephra/persist/AbstractTransactionLog$CaskEntry.class */
    static class CaskEntry implements Writable {
        private LongWritable key;
        private co.cask.tephra.persist.TransactionEdit edit;

        public CaskEntry() {
            this.key = new LongWritable();
            this.edit = new co.cask.tephra.persist.TransactionEdit();
        }

        public CaskEntry(LongWritable longWritable, co.cask.tephra.persist.TransactionEdit transactionEdit) {
            this.key = longWritable;
            this.edit = transactionEdit;
        }

        public LongWritable getKey() {
            return this.key;
        }

        public co.cask.tephra.persist.TransactionEdit getEdit() {
            return this.edit;
        }

        public void write(DataOutput dataOutput) throws IOException {
            this.key.write(dataOutput);
            this.edit.write(dataOutput);
        }

        public void readFields(DataInput dataInput) throws IOException {
            this.key.readFields(dataInput);
            this.edit.readFields(dataInput);
        }
    }

    /* loaded from: input_file:org/apache/tephra/persist/AbstractTransactionLog$Entry.class */
    public static class Entry implements Writable {
        private LongWritable key;
        private TransactionEdit edit;

        public Entry() {
            this.key = new LongWritable();
            this.edit = new TransactionEdit();
        }

        public Entry(LongWritable longWritable, TransactionEdit transactionEdit) {
            this.key = longWritable;
            this.edit = transactionEdit;
        }

        public LongWritable getKey() {
            return this.key;
        }

        public TransactionEdit getEdit() {
            return this.edit;
        }

        public void write(DataOutput dataOutput) throws IOException {
            this.key.write(dataOutput);
            this.edit.write(dataOutput);
        }

        public void readFields(DataInput dataInput) throws IOException {
            this.key.readFields(dataInput);
            this.edit.readFields(dataInput);
        }
    }

    public AbstractTransactionLog(long j, MetricsCollector metricsCollector) {
        this.timestamp = j;
        this.metricsCollector = metricsCollector;
    }

    public synchronized void init() throws IOException {
        if (this.initialized) {
            return;
        }
        this.writer = createWriter();
        this.initialized = true;
    }

    protected abstract TransactionLogWriter createWriter() throws IOException;

    @Override // org.apache.tephra.persist.TransactionLog
    public abstract String getName();

    @Override // org.apache.tephra.persist.TransactionLog
    public long getTimestamp() {
        return this.timestamp;
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public void append(TransactionEdit transactionEdit) throws IOException {
        long nanoTime = System.nanoTime();
        synchronized (this) {
            ensureAvailable();
            append(new Entry(new LongWritable(this.logSequence.getAndIncrement()), transactionEdit));
        }
        sync();
        long nanoTime2 = (System.nanoTime() - nanoTime) / TxConstants.MAX_TX_PER_MS;
        if (nanoTime2 > SLOW_APPEND_THRESHOLD) {
            LOG.info("Slow append to log " + getName() + ", took " + nanoTime2 + " msec.");
        }
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public void append(List<TransactionEdit> list) throws IOException {
        long nanoTime = System.nanoTime();
        synchronized (this) {
            ensureAvailable();
            Iterator<TransactionEdit> it = list.iterator();
            while (it.hasNext()) {
                append(new Entry(new LongWritable(this.logSequence.getAndIncrement()), it.next()));
            }
        }
        sync();
        long nanoTime2 = (System.nanoTime() - nanoTime) / TxConstants.MAX_TX_PER_MS;
        if (nanoTime2 > SLOW_APPEND_THRESHOLD) {
            LOG.info("Slow append to log " + getName() + ", took " + nanoTime2 + " msec.");
        }
    }

    private void ensureAvailable() throws IOException {
        if (this.closed) {
            throw new IOException("Log " + getName() + " is already closed, cannot append!");
        }
        if (this.initialized) {
            return;
        }
        init();
    }

    private void append(Entry entry) throws IOException {
        this.pendingWrites.add(entry);
    }

    private List<Entry> getPendingWrites() {
        List<Entry> list;
        synchronized (this) {
            list = this.pendingWrites;
            this.pendingWrites = new LinkedList();
        }
        return list;
    }

    private void sync() throws IOException {
        long j = 0;
        int i = 0;
        synchronized (this) {
            if (this.closed) {
                return;
            }
            TransactionLogWriter transactionLogWriter = this.writer;
            List<Entry> pendingWrites = getPendingWrites();
            if (!pendingWrites.isEmpty()) {
                transactionLogWriter.commitMarker(pendingWrites.size());
            }
            for (Entry entry : pendingWrites) {
                transactionLogWriter.append(entry);
                i++;
                j = Math.max(j, entry.getKey().get());
            }
            long j2 = this.syncedUpTo.get();
            if (j2 < j) {
                transactionLogWriter.sync();
                this.metricsCollector.histogram("wal.sync.size", i);
                this.syncedUpTo.compareAndSet(j2, j);
            }
        }
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        if (!this.pendingWrites.isEmpty()) {
            sync();
        }
        if (this.writer != null) {
            this.writer.close();
        }
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.apache.tephra.persist.TransactionLog
    public abstract TransactionLogReader getReader() throws IOException;
}
