package org.apache.activemq.kaha.impl.async;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.LinkedNode;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/activemq-core-5.6.0.jar:org/apache/activemq/kaha/impl/async/DataFileAppender.class */
public class DataFileAppender {
    protected static final byte[] RESERVED_SPACE = new byte[21];
    protected static final int DEFAULT_MAX_BATCH_SIZE = 4194304;
    protected final AsyncDataManager dataManager;
    protected final Map<WriteKey, WriteCommand> inflightWrites;
    protected WriteBatch nextWriteBatch;
    protected boolean shutdown;
    protected IOException firstAsyncException;
    protected boolean running;
    private Thread thread;
    protected final Object enqueueMutex = new Object() { // from class: org.apache.activemq.kaha.impl.async.DataFileAppender.1
    };
    protected final CountDownLatch shutdownDone = new CountDownLatch(1);
    protected int maxWriteBatchSize = 4194304;

    /* loaded from: input_file:lib/activemq-core-5.6.0.jar:org/apache/activemq/kaha/impl/async/DataFileAppender$WriteBatch.class */
    public class WriteBatch {
        public final DataFile dataFile;
        public final WriteCommand first;
        public int size;
        public final CountDownLatch latch = new CountDownLatch(1);
        public AtomicReference<IOException> exception = new AtomicReference<>();

        public WriteBatch(DataFile dataFile, WriteCommand writeCommand) throws IOException {
            this.dataFile = dataFile;
            this.first = writeCommand;
            this.size += writeCommand.location.getSize();
        }

        public boolean canAppend(DataFile dataFile, WriteCommand writeCommand) {
            return dataFile == this.dataFile && this.size + writeCommand.location.getSize() < DataFileAppender.this.maxWriteBatchSize;
        }

        public void append(WriteCommand writeCommand) throws IOException {
            this.first.getTailNode().linkAfter(writeCommand);
            this.size += writeCommand.location.getSize();
        }
    }

    /* loaded from: input_file:lib/activemq-core-5.6.0.jar:org/apache/activemq/kaha/impl/async/DataFileAppender$WriteCommand.class */
    public static class WriteCommand extends LinkedNode {
        public final Location location;
        public final ByteSequence data;
        final boolean sync;
        public final Runnable onComplete;

        public WriteCommand(Location location, ByteSequence byteSequence, boolean z) {
            this.location = location;
            this.data = byteSequence;
            this.sync = z;
            this.onComplete = null;
        }

        public WriteCommand(Location location, ByteSequence byteSequence, Runnable runnable) {
            this.location = location;
            this.data = byteSequence;
            this.onComplete = runnable;
            this.sync = false;
        }
    }

    /* loaded from: input_file:lib/activemq-core-5.6.0.jar:org/apache/activemq/kaha/impl/async/DataFileAppender$WriteKey.class */
    public static class WriteKey {
        private final int file;
        private final long offset;
        private final int hash;

        public WriteKey(Location location) {
            this.file = location.getDataFileId();
            this.offset = location.getOffset();
            this.hash = (int) (this.file ^ this.offset);
        }

        public int hashCode() {
            return this.hash;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof WriteKey)) {
                return false;
            }
            WriteKey writeKey = (WriteKey) obj;
            return writeKey.file == this.file && writeKey.offset == this.offset;
        }
    }

    public DataFileAppender(AsyncDataManager asyncDataManager) {
        this.dataManager = asyncDataManager;
        this.inflightWrites = this.dataManager.getInflightWrites();
    }

    public Location storeItem(ByteSequence byteSequence, byte b, boolean z) throws IOException {
        WriteBatch enqueue;
        int length = byteSequence.getLength() + 32;
        Location location = new Location();
        location.setSize(length);
        location.setType(b);
        WriteCommand writeCommand = new WriteCommand(location, byteSequence, z);
        synchronized (this) {
            DataFile allocateLocation = this.dataManager.allocateLocation(location);
            if (!z) {
                this.inflightWrites.put(new WriteKey(location), writeCommand);
            }
            enqueue = enqueue(allocateLocation, writeCommand);
        }
        location.setLatch(enqueue.latch);
        if (z) {
            try {
                enqueue.latch.await();
                IOException iOException = enqueue.exception.get();
                if (iOException != null) {
                    throw iOException;
                }
            } catch (InterruptedException e) {
                throw new InterruptedIOException();
            }
        }
        return location;
    }

    public Location storeItem(ByteSequence byteSequence, byte b, Runnable runnable) throws IOException {
        WriteBatch enqueue;
        int length = byteSequence.getLength() + 32;
        Location location = new Location();
        location.setSize(length);
        location.setType(b);
        WriteCommand writeCommand = new WriteCommand(location, byteSequence, runnable);
        synchronized (this) {
            DataFile allocateLocation = this.dataManager.allocateLocation(location);
            this.inflightWrites.put(new WriteKey(location), writeCommand);
            enqueue = enqueue(allocateLocation, writeCommand);
        }
        location.setLatch(enqueue.latch);
        return location;
    }

    private WriteBatch enqueue(DataFile dataFile, WriteCommand writeCommand) throws IOException {
        WriteBatch writeBatch;
        WriteBatch writeBatch2;
        synchronized (this.enqueueMutex) {
            if (this.shutdown) {
                throw new IOException("Async Writter Thread Shutdown");
            }
            if (!this.running) {
                this.running = true;
                this.thread = new Thread() { // from class: org.apache.activemq.kaha.impl.async.DataFileAppender.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        DataFileAppender.this.processQueue();
                    }
                };
                this.thread.setPriority(10);
                this.thread.setDaemon(true);
                this.thread.setName("ActiveMQ Data File Writer");
                this.thread.start();
                this.firstAsyncException = null;
            }
            if (this.firstAsyncException != null) {
                throw this.firstAsyncException;
            }
            if (this.nextWriteBatch == null) {
                this.nextWriteBatch = new WriteBatch(dataFile, writeCommand);
                writeBatch = this.nextWriteBatch;
                this.enqueueMutex.notify();
            } else if (this.nextWriteBatch.canAppend(dataFile, writeCommand)) {
                this.nextWriteBatch.append(writeCommand);
                writeBatch = this.nextWriteBatch;
            } else {
                while (this.nextWriteBatch != null) {
                    try {
                        this.enqueueMutex.wait();
                    } catch (InterruptedException e) {
                        throw new InterruptedIOException();
                    }
                }
                if (this.shutdown) {
                    throw new IOException("Async Writter Thread Shutdown");
                }
                this.nextWriteBatch = new WriteBatch(dataFile, writeCommand);
                writeBatch = this.nextWriteBatch;
                this.enqueueMutex.notify();
            }
            writeBatch2 = writeBatch;
        }
        return writeBatch2;
    }

    public void close() throws IOException {
        synchronized (this.enqueueMutex) {
            if (!this.shutdown) {
                this.shutdown = true;
                if (this.running) {
                    this.enqueueMutex.notifyAll();
                } else {
                    this.shutdownDone.countDown();
                }
            }
        }
        try {
            this.shutdownDone.await();
        } catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
    }

    protected void processQueue() {
        WriteBatch writeBatch;
        DataFile dataFile = null;
        RandomAccessFile randomAccessFile = null;
        WriteBatch writeBatch2 = null;
        try {
            try {
                DataByteArrayOutputStream dataByteArrayOutputStream = new DataByteArrayOutputStream(this.maxWriteBatchSize);
                loop0: while (true) {
                    synchronized (this.enqueueMutex) {
                        while (this.nextWriteBatch == null) {
                            if (this.shutdown) {
                                break loop0;
                            } else {
                                this.enqueueMutex.wait();
                            }
                        }
                        writeBatch = this.nextWriteBatch;
                        this.nextWriteBatch = null;
                        this.enqueueMutex.notify();
                    }
                    writeBatch2 = writeBatch;
                    if (dataFile != writeBatch2.dataFile) {
                        if (randomAccessFile != null) {
                            dataFile.closeRandomAccessFile(randomAccessFile);
                        }
                        dataFile = writeBatch2.dataFile;
                        randomAccessFile = dataFile.openRandomAccessFile(true);
                    }
                    WriteCommand writeCommand = writeBatch2.first;
                    randomAccessFile.seek(writeCommand.location.getOffset());
                    boolean z = false;
                    if (writeBatch2.size == writeCommand.location.getSize()) {
                        z = writeCommand.sync | (writeCommand.onComplete != null);
                        randomAccessFile.writeInt(writeCommand.location.getSize());
                        randomAccessFile.writeByte(writeCommand.location.getType());
                        randomAccessFile.write(RESERVED_SPACE);
                        randomAccessFile.write(AsyncDataManager.ITEM_HEAD_SOR);
                        randomAccessFile.write(writeCommand.data.getData(), writeCommand.data.getOffset(), writeCommand.data.getLength());
                        randomAccessFile.write(AsyncDataManager.ITEM_HEAD_EOR);
                    } else {
                        while (writeCommand != null) {
                            z |= writeCommand.sync | (writeCommand.onComplete != null);
                            dataByteArrayOutputStream.writeInt(writeCommand.location.getSize());
                            dataByteArrayOutputStream.writeByte(writeCommand.location.getType());
                            dataByteArrayOutputStream.write(RESERVED_SPACE);
                            dataByteArrayOutputStream.write(AsyncDataManager.ITEM_HEAD_SOR);
                            dataByteArrayOutputStream.write(writeCommand.data.getData(), writeCommand.data.getOffset(), writeCommand.data.getLength());
                            dataByteArrayOutputStream.write(AsyncDataManager.ITEM_HEAD_EOR);
                            writeCommand = (WriteCommand) writeCommand.getNext();
                        }
                        ByteSequence byteSequence = dataByteArrayOutputStream.toByteSequence();
                        randomAccessFile.write(byteSequence.getData(), byteSequence.getOffset(), byteSequence.getLength());
                        dataByteArrayOutputStream.reset();
                    }
                    if (z) {
                        randomAccessFile.getFD().sync();
                    }
                    this.dataManager.setLastAppendLocation(((WriteCommand) writeBatch2.first.getTailNode()).location);
                    for (WriteCommand writeCommand2 = writeBatch2.first; writeCommand2 != null; writeCommand2 = (WriteCommand) writeCommand2.getNext()) {
                        if (!writeCommand2.sync) {
                            this.inflightWrites.remove(new WriteKey(writeCommand2.location));
                        }
                        if (writeCommand2.onComplete != null) {
                            try {
                                writeCommand2.onComplete.run();
                            } catch (Throwable th) {
                                th.printStackTrace();
                            }
                        }
                    }
                    writeBatch2.latch.countDown();
                }
                if (randomAccessFile != null) {
                    try {
                        dataFile.closeRandomAccessFile(randomAccessFile);
                    } catch (Throwable th2) {
                    }
                }
                this.shutdownDone.countDown();
            } catch (Throwable th3) {
                if (randomAccessFile != null) {
                    try {
                        dataFile.closeRandomAccessFile(randomAccessFile);
                    } catch (Throwable th4) {
                        this.shutdownDone.countDown();
                        throw th3;
                    }
                }
                this.shutdownDone.countDown();
                throw th3;
            }
        } catch (IOException e) {
            synchronized (this.enqueueMutex) {
                this.firstAsyncException = e;
                if (writeBatch2 != null) {
                    writeBatch2.latch.countDown();
                    writeBatch2.exception.set(e);
                }
                if (this.nextWriteBatch != null) {
                    this.nextWriteBatch.latch.countDown();
                    this.nextWriteBatch.exception.set(e);
                }
                if (randomAccessFile != null) {
                    try {
                        dataFile.closeRandomAccessFile(randomAccessFile);
                    } catch (Throwable th5) {
                        this.shutdownDone.countDown();
                    }
                }
                this.shutdownDone.countDown();
            }
        } catch (InterruptedException e2) {
            if (randomAccessFile != null) {
                try {
                    dataFile.closeRandomAccessFile(randomAccessFile);
                } catch (Throwable th6) {
                    this.shutdownDone.countDown();
                }
            }
            this.shutdownDone.countDown();
        }
    }
}
