package org.apache.nifi.wali;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDeFactory;
import org.wali.SyncListener;
import org.wali.WriteAheadRepository;

/* loaded from: input_file:org/apache/nifi/wali/SequentialAccessWriteAheadLog.class */
public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T> {
    private static final int PARTITION_INDEX = 0;
    private static final Logger logger = LoggerFactory.getLogger(SequentialAccessWriteAheadLog.class);
    private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
    private static final int MAX_BUFFERS = 64;
    private static final int BUFFER_SIZE = 262144;
    private final File storageDirectory;
    private final File journalsDirectory;
    protected final SerDeFactory<T> serdeFactory;
    private final SyncListener syncListener;
    private final Set<String> recoveredSwapLocations;
    private final ReadWriteLock journalRWLock;
    private final Lock journalReadLock;
    private final Lock journalWriteLock;
    private final ObjectPool<ByteArrayDataOutputStream> streamPool;
    private final WriteAheadSnapshot<T> snapshot;
    private final RecordLookup<T> recordLookup;
    private SnapshotRecovery<T> snapshotRecovery;
    private volatile boolean recovered;
    private WriteAheadJournal<T> journal;
    private volatile long nextTransactionId;

    public SequentialAccessWriteAheadLog(File file, SerDeFactory<T> serDeFactory) throws IOException {
        this(file, serDeFactory, SyncListener.NOP_SYNC_LISTENER);
    }

    public SequentialAccessWriteAheadLog(File file, SerDeFactory<T> serDeFactory, SyncListener syncListener) throws IOException {
        this.recoveredSwapLocations = new HashSet();
        this.journalRWLock = new ReentrantReadWriteLock();
        this.journalReadLock = this.journalRWLock.readLock();
        this.journalWriteLock = this.journalRWLock.writeLock();
        this.streamPool = new BlockingQueuePool(MAX_BUFFERS, () -> {
            return new ByteArrayDataOutputStream(BUFFER_SIZE);
        }, byteArrayDataOutputStream -> {
            return byteArrayDataOutputStream.getByteArrayOutputStream().size() < BUFFER_SIZE;
        }, byteArrayDataOutputStream2 -> {
            byteArrayDataOutputStream2.getByteArrayOutputStream().reset();
        });
        this.recovered = false;
        this.nextTransactionId = 0L;
        if (!file.exists() && !file.mkdirs()) {
            throw new IOException("Directory " + String.valueOf(file) + " does not exist and cannot be created");
        }
        if (!file.isDirectory()) {
            throw new IOException("File " + String.valueOf(file) + " is a regular file and not a directory");
        }
        HashMapSnapshot hashMapSnapshot = new HashMapSnapshot(file, serDeFactory);
        this.snapshot = hashMapSnapshot;
        this.recordLookup = hashMapSnapshot;
        this.storageDirectory = file;
        this.journalsDirectory = new File(file, "journals");
        if (!this.journalsDirectory.exists() && !this.journalsDirectory.mkdirs()) {
            throw new IOException("Directory " + String.valueOf(this.journalsDirectory) + " does not exist and cannot be created");
        }
        this.recovered = false;
        this.serdeFactory = serDeFactory;
        this.syncListener = syncListener == null ? SyncListener.NOP_SYNC_LISTENER : syncListener;
    }

    @Override // org.wali.WriteAheadRepository
    public int update(Collection<T> collection, boolean z) throws IOException {
        if (!this.recovered) {
            throw new IllegalStateException("Cannot update repository until record recovery has been performed");
        }
        this.journalReadLock.lock();
        try {
            this.journal.update(collection, this.recordLookup);
            if (z) {
                this.journal.fsync();
                this.syncListener.onSync(PARTITION_INDEX);
            }
            this.snapshot.update(collection);
            return PARTITION_INDEX;
        } finally {
            this.journalReadLock.unlock();
        }
    }

    @Override // org.wali.WriteAheadRepository
    public synchronized Collection<T> recoverRecords() throws IOException {
        if (this.recovered) {
            throw new IllegalStateException("Cannot recover records from repository because record recovery has already commenced");
        }
        logger.info("Recovering records from Write-Ahead Log at {}", this.storageDirectory);
        long nanoTime = System.nanoTime();
        this.recovered = true;
        this.snapshotRecovery = this.snapshot.recover();
        this.recoveredSwapLocations.addAll(this.snapshotRecovery.getRecoveredSwapLocations());
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
        Map<Object, T> records = this.snapshotRecovery.getRecords();
        Set<String> recoveredSwapLocations = this.snapshotRecovery.getRecoveredSwapLocations();
        File[] listFiles = this.journalsDirectory.listFiles(this::isJournalFile);
        if (listFiles == null) {
            throw new IOException("Cannot access the list of files in directory " + String.valueOf(this.journalsDirectory) + "; please ensure that appropriate file permissions are set.");
        }
        if (this.snapshotRecovery.getRecoveryFile() == null) {
            logger.info("No Snapshot File to recover from at {}. Now recovering records from {} journal files", this.storageDirectory, Integer.valueOf(listFiles.length));
        } else {
            logger.info("Successfully recovered {} records and {} swap files from Snapshot at {} with Max Transaction ID of {} in {} milliseconds. Now recovering records from {} journal files", new Object[]{Integer.valueOf(records.size()), Integer.valueOf(recoveredSwapLocations.size()), this.snapshotRecovery.getRecoveryFile(), Long.valueOf(this.snapshotRecovery.getMaxTransactionId()), Long.valueOf(millis), Integer.valueOf(listFiles.length)});
        }
        List<File> asList = Arrays.asList(listFiles);
        Collections.sort(asList, new Comparator<File>() { // from class: org.apache.nifi.wali.SequentialAccessWriteAheadLog.1
            @Override // java.util.Comparator
            public int compare(File file, File file2) {
                return Long.compare(SequentialAccessWriteAheadLog.this.getMinTransactionId(file), SequentialAccessWriteAheadLog.this.getMinTransactionId(file2));
            }
        });
        long maxTransactionId = this.snapshotRecovery.getMaxTransactionId();
        int i = PARTITION_INDEX;
        int i2 = PARTITION_INDEX;
        int i3 = PARTITION_INDEX;
        long j = maxTransactionId;
        for (File file : asList) {
            long minTransactionId = getMinTransactionId(file);
            if (minTransactionId < maxTransactionId) {
                logger.debug("Will not recover records from journal file {} because the minimum Transaction ID for that journal is {} and the Transaction ID recovered from Snapshot was {}", new Object[]{file, Long.valueOf(minTransactionId), Long.valueOf(maxTransactionId)});
                i3++;
            } else {
                logger.debug("Min Transaction ID for journal {} is {}, so will recover records from journal", file, Long.valueOf(minTransactionId));
                i2++;
                LengthDelimitedJournal lengthDelimitedJournal = new LengthDelimitedJournal(file, this.serdeFactory, this.streamPool, 0L);
                try {
                    JournalRecovery recoverRecords = lengthDelimitedJournal.recoverRecords(records, recoveredSwapLocations);
                    int updateCount = recoverRecords.getUpdateCount();
                    logger.debug("Recovered {} updates from journal {}", Integer.valueOf(updateCount), file);
                    i += updateCount;
                    j = Math.max(j, recoverRecords.getMaxTransactionId());
                    lengthDelimitedJournal.close();
                } catch (Throwable th) {
                    try {
                        lengthDelimitedJournal.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
        }
        logger.debug("Recovered {} updates from {} journal files and skipped {} journal files because their data was already encapsulated in the snapshot", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
        this.nextTransactionId = j + 1;
        logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", Integer.valueOf(records.size()), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
        this.recoveredSwapLocations.addAll(recoveredSwapLocations);
        checkpoint(this.recoveredSwapLocations);
        return records.values();
    }

    private long getMinTransactionId(File file) {
        String name = file.getName();
        return Long.parseLong(name.substring(PARTITION_INDEX, name.indexOf(".")));
    }

    private boolean isJournalFile(File file) {
        if (!file.isFile()) {
            return false;
        }
        return JOURNAL_FILENAME_PATTERN.matcher(file.getName()).matches();
    }

    @Override // org.wali.WriteAheadRepository
    public synchronized Set<String> getRecoveredSwapLocations() throws IOException {
        if (this.recovered) {
            return Collections.unmodifiableSet(this.recoveredSwapLocations);
        }
        throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed");
    }

    public SnapshotCapture<T> captureSnapshot() {
        return this.snapshot.prepareSnapshot(this.nextTransactionId - 1);
    }

    @Override // org.wali.WriteAheadRepository
    public int checkpoint() throws IOException {
        return checkpoint(null);
    }

    private int checkpoint(Set<String> set) throws IOException {
        long nanoTime = System.nanoTime();
        this.journalWriteLock.lock();
        try {
            if (this.journal != null) {
                JournalSummary summary = this.journal.getSummary();
                if (summary.getTransactionCount() == 0 && this.journal.isHealthy()) {
                    logger.debug("Will not checkpoint Write-Ahead Log because no updates have occurred since last checkpoint");
                    int recordCount = this.snapshot.getRecordCount();
                    this.journalWriteLock.unlock();
                    return recordCount;
                }
                try {
                    this.journal.fsync();
                } catch (Exception e) {
                    logger.error("Failed to synch Write-Ahead Log's journal to disk at {}", this.storageDirectory, e);
                }
                try {
                    this.journal.close();
                } catch (Exception e2) {
                    logger.error("Failed to close Journal while attempting to checkpoint Write-Ahead Log at {}", this.storageDirectory);
                }
                this.nextTransactionId = Math.max(this.nextTransactionId, summary.getLastTransactionId() + 1);
            }
            this.syncListener.onGlobalSync();
            File[] listFiles = this.journalsDirectory.listFiles(this::isJournalFile);
            File[] fileArr = listFiles == null ? new File[PARTITION_INDEX] : listFiles;
            SnapshotCapture<T> prepareSnapshot = set == null ? this.snapshot.prepareSnapshot(this.nextTransactionId - 1) : this.snapshot.prepareSnapshot(this.nextTransactionId - 1, set);
            File file = new File(this.journalsDirectory, this.nextTransactionId + ".journal");
            while (file.exists()) {
                this.nextTransactionId++;
                file = new File(this.journalsDirectory, this.nextTransactionId + ".journal");
            }
            this.journal = new LengthDelimitedJournal(file, this.serdeFactory, this.streamPool, this.nextTransactionId);
            this.journal.writeHeader();
            logger.debug("Created new Journal starting with Transaction ID {}", Long.valueOf(this.nextTransactionId));
            this.journalWriteLock.unlock();
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            this.snapshot.writeSnapshot(prepareSnapshot);
            int length = fileArr.length;
            for (int i = PARTITION_INDEX; i < length; i++) {
                new LengthDelimitedJournal(fileArr[i], this.serdeFactory, this.streamPool, this.nextTransactionId).dispose();
            }
            logger.info("Checkpointed Write-Ahead Log with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds), max Transaction ID {}", new Object[]{Integer.valueOf(prepareSnapshot.getRecords().size()), Integer.valueOf(prepareSnapshot.getSwapLocations().size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), Long.valueOf(millis), Long.valueOf(prepareSnapshot.getMaxTransactionId())});
            return prepareSnapshot.getRecords().size();
        } catch (Throwable th) {
            this.journalWriteLock.unlock();
            throw th;
        }
    }

    @Override // org.wali.WriteAheadRepository
    public void shutdown() throws IOException {
        this.journalWriteLock.lock();
        try {
            if (this.journal != null) {
                this.journal.close();
            }
        } finally {
            this.journalWriteLock.unlock();
        }
    }
}
