/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bk_v4_2_0.bookkeeper.bookie;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.bk_v4_2_0.bookkeeper.bookie.Bookie;
import org.apache.bk_v4_2_0.bookkeeper.bookie.BufferedChannel;
import org.apache.bk_v4_2_0.bookkeeper.bookie.JournalChannel;
import org.apache.bk_v4_2_0.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration;
import org.apache.bk_v4_2_0.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bk_v4_2_0.bookkeeper.util.IOUtils;
import org.apache.bk_v4_2_0.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Journal
extends Thread {
    static Logger LOG = LoggerFactory.getLogger(Journal.class);
    static final long MB = 0x100000L;
    final long maxJournalSize;
    final int maxBackupJournals;
    final File journalDirectory;
    final ServerConfiguration conf;
    private LastLogMark lastLogMark = new LastLogMark(0L, 0L);
    LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue();
    volatile boolean running = true;
    private LedgerDirsManager ledgerDirsManager;

    private static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
        File[] logFiles = journalDir.listFiles();
        ArrayList<Long> logs = new ArrayList<Long>();
        for (File f : logFiles) {
            String name = f.getName();
            if (!name.endsWith(".txn")) continue;
            String idString = name.split("\\.")[0];
            long id = Long.parseLong(idString, 16);
            if (filter != null) {
                if (!filter.accept(id)) continue;
                logs.add(id);
                continue;
            }
            logs.add(id);
        }
        Collections.sort(logs);
        return logs;
    }

    public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
        super("BookieJournal-" + conf.getBookiePort());
        this.ledgerDirsManager = ledgerDirsManager;
        this.conf = conf;
        this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
        this.maxJournalSize = conf.getMaxJournalSize() * 0x100000L;
        this.maxBackupJournals = conf.getMaxBackupJournals();
        this.lastLogMark.readLog();
        LOG.debug("Last Log Mark : {}", (Object)this.lastLogMark);
    }

    LastLogMark getLastLogMark() {
        return this.lastLogMark;
    }

    public void markLog() {
        this.lastLogMark.markLog();
    }

    public void rollLog() throws LedgerDirsManager.NoWritableLedgerDirException {
        this.lastLogMark.rollLog();
    }

    public void gcJournals() {
        List<Long> logs = Journal.listJournalIds(this.journalDirectory, new JournalRollingFilter());
        if (logs.size() >= this.maxBackupJournals) {
            int maxIdx = logs.size() - this.maxBackupJournals;
            for (int i = 0; i < maxIdx; ++i) {
                long id = logs.get(i);
                if (id >= this.lastLogMark.getLastMark().getTxnLogId()) continue;
                File journalFile = new File(this.journalDirectory, Long.toHexString(id) + ".txn");
                if (!journalFile.delete()) {
                    LOG.warn("Could not delete old journal file {}", (Object)journalFile);
                }
                LOG.info("garbage collected journal " + journalFile.getName());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void scanJournal(long journalId, long journalPos, JournalScanner scanner) throws IOException {
        JournalChannel recLog = journalPos <= 0L ? new JournalChannel(this.journalDirectory, journalId) : new JournalChannel(this.journalDirectory, journalId, journalPos);
        int journalVersion = recLog.getFormatVersion();
        try {
            ByteBuffer lenBuff = ByteBuffer.allocate(4);
            ByteBuffer recBuff = ByteBuffer.allocate(65536);
            while (true) {
                long offset = recLog.fc.position();
                lenBuff.clear();
                Journal.fullRead(recLog, lenBuff);
                if (lenBuff.remaining() != 0) {
                    break;
                }
                lenBuff.flip();
                int len = lenBuff.getInt();
                if (len == 0) {
                    break;
                }
                recBuff.clear();
                if (recBuff.remaining() < len) {
                    recBuff = ByteBuffer.allocate(len);
                }
                recBuff.limit(len);
                if (Journal.fullRead(recLog, recBuff) != len) {
                    break;
                }
                recBuff.flip();
                scanner.process(journalVersion, offset, recBuff);
            }
        }
        finally {
            recLog.close();
        }
    }

    public void replay(JournalScanner scanner) throws IOException {
        final long markedLogId = this.lastLogMark.getTxnLogId();
        List<Long> logs = Journal.listJournalIds(this.journalDirectory, new JournalIdFilter(){

            @Override
            public boolean accept(long journalId) {
                return journalId >= markedLogId;
            }
        });
        if (markedLogId > 0L && (logs.size() == 0 || logs.get(0) != markedLogId)) {
            throw new IOException("Recovery log " + markedLogId + " is missing");
        }
        LOG.debug("Try to relay journal logs : {}", logs);
        for (Long id : logs) {
            long logPosition = 0L;
            if (id == markedLogId) {
                logPosition = this.lastLogMark.getTxnLogPosition();
            }
            this.scanJournal(id, logPosition, scanner);
        }
    }

    public void logAddEntry(ByteBuffer entry, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx) {
        long ledgerId = entry.getLong();
        long entryId = entry.getLong();
        entry.rewind();
        this.queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
    }

    public int getJournalQueueLength() {
        return this.queue.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void run() {
        LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
        ByteBuffer lenBuff = ByteBuffer.allocate(4);
        JournalChannel logFile = null;
        try {
            long logId = 0L;
            BufferedChannel bc = null;
            long lastFlushPosition = 0L;
            QueueEntry qe = null;
            while (true) {
                if (null == logFile) {
                    logId = MathUtils.now();
                    logFile = new JournalChannel(this.journalDirectory, logId);
                    bc = logFile.getBufferedChannel();
                    lastFlushPosition = 0L;
                }
                if (qe == null) {
                    if (toFlush.isEmpty()) {
                        qe = this.queue.take();
                    } else {
                        qe = this.queue.poll();
                        if (qe == null || bc.position() > lastFlushPosition + 524288L) {
                            bc.flush(true);
                            lastFlushPosition = bc.position();
                            this.lastLogMark.setLastLogMark(logId, lastFlushPosition);
                            for (QueueEntry e : toFlush) {
                                e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
                            }
                            toFlush.clear();
                            if (bc.position() > this.maxJournalSize) {
                                logFile.close();
                                logFile = null;
                                continue;
                            }
                        }
                    }
                }
                if (!this.running) break;
                if (qe == null) continue;
                lenBuff.clear();
                lenBuff.putInt(qe.entry.remaining());
                lenBuff.flip();
                bc.write(lenBuff);
                bc.write(qe.entry);
                logFile.preAllocIfNeeded();
                toFlush.add(qe);
                qe = null;
            }
            LOG.info("Journal Manager is asked to shut down, quit.");
            logFile.close();
            logFile = null;
        }
        catch (IOException ioe) {
            LOG.error("I/O exception in Journal thread!", (Throwable)ioe);
            IOUtils.close(LOG, logFile);
        }
        catch (InterruptedException ie) {
            LOG.warn("Journal exits when shutting down", (Throwable)ie);
            {
                catch (Throwable throwable) {
                    IOUtils.close(LOG, logFile);
                    throw throwable;
                }
            }
            IOUtils.close(LOG, logFile);
        }
        IOUtils.close(LOG, logFile);
    }

    public synchronized void shutdown() {
        try {
            if (!this.running) {
                return;
            }
            this.running = false;
            this.interrupt();
            this.join();
        }
        catch (InterruptedException ie) {
            LOG.warn("Interrupted during shutting down journal : ", (Throwable)ie);
        }
    }

    private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
        int total = 0;
        while (bb.remaining() > 0) {
            int rc = fc.read(bb);
            if (rc <= 0) {
                return total;
            }
            total += rc;
        }
        return total;
    }

    private static class QueueEntry {
        ByteBuffer entry;
        long ledgerId;
        long entryId;
        BookkeeperInternalCallbacks.WriteCallback cb;
        Object ctx;

        QueueEntry(ByteBuffer entry, long ledgerId, long entryId, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx) {
            this.entry = entry.duplicate();
            this.cb = cb;
            this.ctx = ctx;
            this.ledgerId = ledgerId;
            this.entryId = entryId;
        }
    }

    public static interface JournalScanner {
        public void process(int var1, long var2, ByteBuffer var4) throws IOException;
    }

    private class JournalRollingFilter
    implements JournalIdFilter {
        private JournalRollingFilter() {
        }

        @Override
        public boolean accept(long journalId) {
            return journalId < Journal.this.lastLogMark.getLastMark().getTxnLogId();
        }
    }

    class LastLogMark {
        private long txnLogId;
        private long txnLogPosition;
        private LastLogMark lastMark;

        LastLogMark(long logId, long logPosition) {
            this.txnLogId = logId;
            this.txnLogPosition = logPosition;
        }

        synchronized void setLastLogMark(long logId, long logPosition) {
            this.txnLogId = logId;
            this.txnLogPosition = logPosition;
        }

        synchronized void markLog() {
            this.lastMark = new LastLogMark(this.txnLogId, this.txnLogPosition);
        }

        synchronized LastLogMark getLastMark() {
            return this.lastMark;
        }

        synchronized long getTxnLogId() {
            return this.txnLogId;
        }

        synchronized long getTxnLogPosition() {
            return this.txnLogPosition;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        synchronized void rollLog() throws LedgerDirsManager.NoWritableLedgerDirException {
            byte[] buff = new byte[16];
            ByteBuffer bb = ByteBuffer.wrap(buff);
            bb.putLong(this.lastMark.getTxnLogId());
            bb.putLong(this.lastMark.getTxnLogPosition());
            LOG.debug("RollLog to persist last marked log : {}", (Object)this.lastMark);
            List<File> writableLedgerDirs = Journal.this.ledgerDirsManager.getWritableLedgerDirs();
            for (File dir : writableLedgerDirs) {
                File file = new File(dir, "lastMark");
                FileOutputStream fos = null;
                try {
                    fos = new FileOutputStream(file);
                    fos.write(buff);
                    fos.getChannel().force(true);
                    fos.close();
                    fos = null;
                }
                catch (IOException e) {
                    try {
                        LOG.error("Problems writing to " + file, (Throwable)e);
                    }
                    catch (Throwable throwable) {
                        IOUtils.close(LOG, fos);
                        throw throwable;
                    }
                    IOUtils.close(LOG, fos);
                    continue;
                }
                IOUtils.close(LOG, fos);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        synchronized void readLog() {
            byte[] buff = new byte[16];
            ByteBuffer bb = ByteBuffer.wrap(buff);
            for (File dir : Journal.this.ledgerDirsManager.getAllLedgerDirs()) {
                File file = new File(dir, "lastMark");
                try {
                    FileInputStream fis = new FileInputStream(file);
                    try {
                        int bytesRead = fis.read(buff);
                        if (bytesRead != 16) {
                            throw new IOException("Couldn't read enough bytes from lastMark. Wanted 16, got " + bytesRead);
                        }
                    }
                    finally {
                        fis.close();
                    }
                    bb.clear();
                    long i = bb.getLong();
                    long p = bb.getLong();
                    if (i <= this.txnLogId) continue;
                    this.txnLogId = i;
                    if (p <= this.txnLogPosition) continue;
                    this.txnLogPosition = p;
                }
                catch (IOException e) {
                    LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
                }
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("LastMark: logId - ").append(this.txnLogId).append(" , position - ").append(this.txnLogPosition);
            return sb.toString();
        }
    }

    private static interface JournalIdFilter {
        public boolean accept(long var1);
    }
}

