package org.apache.bk_v4_1_0.bookkeeper.bookie;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.bk_v4_1_0.bookkeeper.conf.ServerConfiguration;
import org.apache.bk_v4_1_0.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bk_v4_1_0.bookkeeper.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/bk_v4_1_0/bookkeeper/bookie/Journal.class */
public class Journal extends Thread {
    static Logger LOG = LoggerFactory.getLogger(Journal.class);
    static final long MB = 1048576;
    final long maxJournalSize;
    final int maxBackupJournals;
    final File journalDirectory;
    final File[] ledgerDirectories;
    final ServerConfiguration conf;
    private LastLogMark lastLogMark;
    LinkedBlockingQueue<QueueEntry> queue;
    volatile boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bk_v4_1_0/bookkeeper/bookie/Journal$JournalIdFilter.class */
    public interface JournalIdFilter {
        boolean accept(long j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bk_v4_1_0/bookkeeper/bookie/Journal$JournalRollingFilter.class */
    public class JournalRollingFilter implements JournalIdFilter {
        private JournalRollingFilter() {
        }

        @Override // org.apache.bk_v4_1_0.bookkeeper.bookie.Journal.JournalIdFilter
        public boolean accept(long j) {
            return j < Journal.this.lastLogMark.getLastMark().getTxnLogId();
        }
    }

    /* loaded from: input_file:org/apache/bk_v4_1_0/bookkeeper/bookie/Journal$JournalScanner.class */
    public interface JournalScanner {
        void process(int i, long j, ByteBuffer byteBuffer) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bk_v4_1_0/bookkeeper/bookie/Journal$LastLogMark.class */
    public class LastLogMark {
        long txnLogId;
        long txnLogPosition;
        LastLogMark lastMark;

        LastLogMark(long j, long j2) {
            this.txnLogId = j;
            this.txnLogPosition = j2;
        }

        synchronized void setLastLogMark(long j, long j2) {
            this.txnLogId = j;
            this.txnLogPosition = j2;
        }

        synchronized void markLog() {
            this.lastMark = new LastLogMark(this.txnLogId, this.txnLogPosition);
        }

        synchronized LastLogMark getLastMark() {
            return this.lastMark;
        }

        synchronized long getTxnLogId() {
            return this.txnLogId;
        }

        synchronized long getTxnLogPosition() {
            return this.txnLogPosition;
        }

        synchronized void rollLog() {
            byte[] bArr = new byte[16];
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            wrap.putLong(this.lastMark.getTxnLogId());
            wrap.putLong(this.lastMark.getTxnLogPosition());
            if (Journal.LOG.isDebugEnabled()) {
                Journal.LOG.debug("RollLog to persist last marked log : " + this.lastMark);
            }
            for (File file : Journal.this.ledgerDirectories) {
                File file2 = new File(file, "lastMark");
                try {
                    FileOutputStream fileOutputStream = new FileOutputStream(file2);
                    fileOutputStream.write(bArr);
                    fileOutputStream.getChannel().force(true);
                    fileOutputStream.close();
                } catch (IOException e) {
                    Journal.LOG.error("Problems writing to " + file2, e);
                }
            }
        }

        synchronized void readLog() {
            FileInputStream fileInputStream;
            int read;
            byte[] bArr = new byte[16];
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            for (File file : Journal.this.ledgerDirectories) {
                File file2 = new File(file, "lastMark");
                try {
                    fileInputStream = new FileInputStream(file2);
                    try {
                        read = fileInputStream.read(bArr);
                    } catch (Throwable th) {
                        fileInputStream.close();
                        throw th;
                        break;
                    }
                } catch (IOException e) {
                    Journal.LOG.error("Problems reading from " + file2 + " (this is okay if it is the first time starting this bookie");
                }
                if (read != 16) {
                    throw new IOException("Couldn't read enough bytes from lastMark. Wanted 16, got " + read);
                    break;
                }
                fileInputStream.close();
                wrap.clear();
                long j = wrap.getLong();
                long j2 = wrap.getLong();
                if (j > this.txnLogId) {
                    this.txnLogId = j;
                    if (j2 > this.txnLogPosition) {
                        this.txnLogPosition = j2;
                    }
                }
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("LastMark: logId - ").append(this.txnLogId).append(" , position - ").append(this.txnLogPosition);
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bk_v4_1_0/bookkeeper/bookie/Journal$QueueEntry.class */
    public static class QueueEntry {
        ByteBuffer entry;
        long ledgerId;
        long entryId;
        BookkeeperInternalCallbacks.WriteCallback cb;
        Object ctx;

        QueueEntry(ByteBuffer byteBuffer, long j, long j2, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj) {
            this.entry = byteBuffer.duplicate();
            this.cb = writeCallback;
            this.ctx = obj;
            this.ledgerId = j;
            this.entryId = j2;
        }
    }

    private static List<Long> listJournalIds(File file, JournalIdFilter journalIdFilter) {
        File[] listFiles = file.listFiles();
        ArrayList arrayList = new ArrayList();
        for (File file2 : listFiles) {
            String name = file2.getName();
            if (name.endsWith(".txn")) {
                long parseLong = Long.parseLong(name.split("\\.")[0], 16);
                if (journalIdFilter == null) {
                    arrayList.add(Long.valueOf(parseLong));
                } else if (journalIdFilter.accept(parseLong)) {
                    arrayList.add(Long.valueOf(parseLong));
                }
            }
        }
        Collections.sort(arrayList);
        return arrayList;
    }

    public Journal(ServerConfiguration serverConfiguration) {
        super("BookieJournal-" + serverConfiguration.getBookiePort());
        this.lastLogMark = new LastLogMark(0L, 0L);
        this.queue = new LinkedBlockingQueue<>();
        this.running = true;
        this.conf = serverConfiguration;
        this.journalDirectory = Bookie.getCurrentDirectory(serverConfiguration.getJournalDir());
        this.ledgerDirectories = Bookie.getCurrentDirectories(serverConfiguration.getLedgerDirs());
        this.maxJournalSize = serverConfiguration.getMaxJournalSize() * 1048576;
        this.maxBackupJournals = serverConfiguration.getMaxBackupJournals();
        this.lastLogMark.readLog();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Last Log Mark : " + this.lastLogMark);
        }
    }

    LastLogMark getLastLogMark() {
        return this.lastLogMark;
    }

    public void markLog() {
        this.lastLogMark.markLog();
    }

    public void rollLog() {
        this.lastLogMark.rollLog();
    }

    public void gcJournals() {
        List<Long> listJournalIds = listJournalIds(this.journalDirectory, new JournalRollingFilter());
        if (listJournalIds.size() >= this.maxBackupJournals) {
            int size = listJournalIds.size() - this.maxBackupJournals;
            for (int i = 0; i < size; i++) {
                long longValue = listJournalIds.get(i).longValue();
                if (longValue < this.lastLogMark.getLastMark().getTxnLogId()) {
                    File file = new File(this.journalDirectory, Long.toHexString(longValue) + ".txn");
                    if (!file.delete()) {
                        LOG.warn("Could not delete old journal file {}", file);
                    }
                    LOG.info("garbage collected journal " + file.getName());
                }
            }
        }
    }

    public void scanJournal(long j, long j2, JournalScanner journalScanner) throws IOException {
        JournalChannel journalChannel = j2 <= 0 ? new JournalChannel(this.journalDirectory, j) : new JournalChannel(this.journalDirectory, j, j2);
        int formatVersion = journalChannel.getFormatVersion();
        try {
            ByteBuffer allocate = ByteBuffer.allocate(4);
            ByteBuffer allocate2 = ByteBuffer.allocate(65536);
            while (true) {
                long position = journalChannel.fc.position();
                allocate.clear();
                fullRead(journalChannel, allocate);
                if (allocate.remaining() != 0) {
                    break;
                }
                allocate.flip();
                int i = allocate.getInt();
                if (i == 0) {
                    break;
                }
                allocate2.clear();
                if (allocate2.remaining() < i) {
                    allocate2 = ByteBuffer.allocate(i);
                }
                allocate2.limit(i);
                if (fullRead(journalChannel, allocate2) != i) {
                    break;
                }
                allocate2.flip();
                journalScanner.process(formatVersion, position, allocate2);
            }
        } finally {
            journalChannel.close();
        }
    }

    public void replay(JournalScanner journalScanner) throws IOException {
        final long txnLogId = this.lastLogMark.getTxnLogId();
        List<Long> listJournalIds = listJournalIds(this.journalDirectory, new JournalIdFilter() { // from class: org.apache.bk_v4_1_0.bookkeeper.bookie.Journal.1
            @Override // org.apache.bk_v4_1_0.bookkeeper.bookie.Journal.JournalIdFilter
            public boolean accept(long j) {
                return j >= txnLogId;
            }
        });
        if (txnLogId > 0 && (listJournalIds.size() == 0 || listJournalIds.get(0).longValue() != txnLogId)) {
            throw new IOException("Recovery log " + txnLogId + " is missing");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Try to relay journal logs : " + listJournalIds);
        }
        for (Long l : listJournalIds) {
            long j = 0;
            if (l.longValue() == txnLogId) {
                j = this.lastLogMark.getTxnLogPosition();
            }
            scanJournal(l.longValue(), j, journalScanner);
        }
    }

    public void logAddEntry(ByteBuffer byteBuffer, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj) {
        long j = byteBuffer.getLong();
        long j2 = byteBuffer.getLong();
        byteBuffer.rewind();
        this.queue.add(new QueueEntry(byteBuffer, j, j2, writeCallback, obj));
    }

    public int getJournalQueueLength() {
        return this.queue.size();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LinkedList linkedList = new LinkedList();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        JournalChannel journalChannel = null;
        try {
            long j = 0;
            BufferedChannel bufferedChannel = null;
            long j2 = 0;
            QueueEntry queueEntry = null;
            while (true) {
                if (null == journalChannel) {
                    try {
                        j = System.currentTimeMillis();
                        journalChannel = new JournalChannel(this.journalDirectory, j);
                        bufferedChannel = journalChannel.getBufferedChannel();
                        j2 = 0;
                    } catch (IOException e) {
                        LOG.error("I/O exception in Journal thread!", e);
                        IOUtils.close(LOG, journalChannel);
                        return;
                    } catch (InterruptedException e2) {
                        LOG.warn("Journal exits when shutting down", e2);
                        IOUtils.close(LOG, journalChannel);
                        return;
                    }
                }
                if (queueEntry == null) {
                    if (linkedList.isEmpty()) {
                        queueEntry = this.queue.take();
                    } else {
                        queueEntry = this.queue.poll();
                        if (queueEntry == null || bufferedChannel.position() > j2 + 524288) {
                            bufferedChannel.flush(true);
                            j2 = bufferedChannel.position();
                            this.lastLogMark.setLastLogMark(j, j2);
                            Iterator it = linkedList.iterator();
                            while (it.hasNext()) {
                                QueueEntry queueEntry2 = (QueueEntry) it.next();
                                queueEntry2.cb.writeComplete(0, queueEntry2.ledgerId, queueEntry2.entryId, null, queueEntry2.ctx);
                            }
                            linkedList.clear();
                            if (bufferedChannel.position() > this.maxJournalSize) {
                                journalChannel.close();
                                journalChannel = null;
                            }
                        }
                    }
                }
                if (!this.running) {
                    LOG.info("Journal Manager is asked to shut down, quit.");
                    journalChannel.close();
                    journalChannel = null;
                    IOUtils.close(LOG, null);
                    return;
                }
                if (queueEntry != null) {
                    allocate.clear();
                    allocate.putInt(queueEntry.entry.remaining());
                    allocate.flip();
                    bufferedChannel.write(allocate);
                    bufferedChannel.write(queueEntry.entry);
                    journalChannel.preAllocIfNeeded();
                    linkedList.add(queueEntry);
                    queueEntry = null;
                }
            }
        } catch (Throwable th) {
            IOUtils.close(LOG, journalChannel);
            throw th;
        }
    }

    public synchronized void shutdown() {
        try {
            if (this.running) {
                this.running = false;
                interrupt();
                join();
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted during shutting down journal : ", e);
        }
    }

    private static int fullRead(JournalChannel journalChannel, ByteBuffer byteBuffer) throws IOException {
        int i;
        int read;
        while (true) {
            int i2 = i;
            i = (byteBuffer.remaining() > 0 && (read = journalChannel.read(byteBuffer)) > 0) ? i2 + read : 0;
            return i2;
        }
    }
}
