/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockHeaderImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockAwareSegmentInputStreamImpl
extends BlockAwareSegmentInputStream {
    private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
    static final int[] BLOCK_END_PADDING = new int[]{254, 220, 222, 173};
    static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray((int)-19079507);
    private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128);
    private final ReadHandle ledger;
    private final long startEntryId;
    private final int blockSize;
    private int blockEntryCount;
    private int bytesReadOffset = 0;
    private int dataBlockFullOffset;
    private final InputStream dataBlockHeaderStream;
    private static final int ENTRIES_PER_READ = 100;
    static final int ENTRY_HEADER_SIZE = 12;
    private List<ByteBuf> entriesByteBuf = null;
    private LedgerOffloaderStats offloaderStats;
    private String managedLedgerName;
    private String topicName;
    private int currentOffset = 0;
    private final AtomicBoolean close = new AtomicBoolean(false);

    public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
        this.ledger = ledger;
        this.startEntryId = startEntryId;
        this.blockSize = blockSize;
        this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream();
        this.blockEntryCount = 0;
        this.dataBlockFullOffset = blockSize;
        this.entriesByteBuf = Lists.newLinkedList();
    }

    public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize, LedgerOffloaderStats offloaderStats, String ledgerName) {
        this(ledger, startEntryId, blockSize);
        this.offloaderStats = offloaderStats;
        this.managedLedgerName = ledgerName;
        this.topicName = TopicName.fromPersistenceNamingEncoding((String)ledgerName);
    }

    private ByteBuf readEntries(int len) throws IOException {
        Preconditions.checkState((this.bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset() ? 1 : 0) != 0);
        Preconditions.checkState((this.bytesReadOffset < this.blockSize ? 1 : 0) != 0);
        if (this.bytesReadOffset < this.dataBlockFullOffset && this.entriesByteBuf.isEmpty() && this.startEntryId + (long)this.blockEntryCount <= this.ledger.getLastAddConfirmed()) {
            this.entriesByteBuf = this.readNextEntriesFromLedger(this.startEntryId + (long)this.blockEntryCount, 100L);
        }
        if (!this.entriesByteBuf.isEmpty() && this.bytesReadOffset + this.entriesByteBuf.get(0).readableBytes() <= this.blockSize) {
            ByteBuf entryByteBuf = this.entriesByteBuf.get(0);
            int readableBytes = entryByteBuf.readableBytes();
            int read = Math.min(readableBytes, len);
            ByteBuf buf = entryByteBuf.slice(this.currentOffset, read);
            buf.retain();
            this.currentOffset += read;
            entryByteBuf.readerIndex(this.currentOffset);
            this.bytesReadOffset += read;
            if (entryByteBuf.readableBytes() == 0) {
                entryByteBuf.release();
                this.entriesByteBuf.remove(0);
                ++this.blockEntryCount;
                this.currentOffset = 0;
            }
            return buf;
        }
        if (this.dataBlockFullOffset == this.blockSize) {
            this.dataBlockFullOffset = this.bytesReadOffset;
        }
        this.paddingBuf.clear();
        for (int i = 0; i < Math.min(len, this.paddingBuf.capacity()); ++i) {
            this.paddingBuf.writeByte((int)BLOCK_END_PADDING_BYTES[(this.bytesReadOffset++ - this.dataBlockFullOffset) % BLOCK_END_PADDING_BYTES.length]);
        }
        return this.paddingBuf.retain();
    }

    private int readEntries() throws IOException {
        Preconditions.checkState((this.bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset() ? 1 : 0) != 0);
        Preconditions.checkState((this.bytesReadOffset < this.blockSize ? 1 : 0) != 0);
        if (this.bytesReadOffset < this.dataBlockFullOffset && this.entriesByteBuf.isEmpty() && this.startEntryId + (long)this.blockEntryCount <= this.ledger.getLastAddConfirmed()) {
            this.entriesByteBuf = this.readNextEntriesFromLedger(this.startEntryId + (long)this.blockEntryCount, 100L);
        }
        if (!this.entriesByteBuf.isEmpty() && this.bytesReadOffset + this.entriesByteBuf.get(0).readableBytes() <= this.blockSize) {
            ByteBuf entryByteBuf = this.entriesByteBuf.get(0);
            short ret = entryByteBuf.readUnsignedByte();
            ++this.bytesReadOffset;
            if (entryByteBuf.readableBytes() == 0) {
                entryByteBuf.release();
                this.entriesByteBuf.remove(0);
                ++this.blockEntryCount;
            }
            return ret;
        }
        if (this.dataBlockFullOffset == this.blockSize) {
            this.dataBlockFullOffset = this.bytesReadOffset;
        }
        return BLOCK_END_PADDING[(this.bytesReadOffset++ - this.dataBlockFullOffset) % BLOCK_END_PADDING.length];
    }

    private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException {
        LinkedList linkedList;
        block12: {
            long end = Math.min(start + maxNumberEntries - 1L, this.ledger.getLastAddConfirmed());
            long startTime = System.nanoTime();
            LedgerEntries ledgerEntriesOnce = (LedgerEntries)this.ledger.readAsync(start, end).get();
            try {
                if (log.isDebugEnabled()) {
                    log.debug("read ledger entries. start: {}, end: {} cost {}", new Object[]{start, end, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime)});
                }
                if (this.offloaderStats != null && this.managedLedgerName != null) {
                    this.offloaderStats.recordReadLedgerLatency(this.topicName, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
                }
                LinkedList entries = Lists.newLinkedList();
                for (LedgerEntry entry : ledgerEntriesOnce) {
                    ByteBuf buf = entry.getEntryBuffer().retain();
                    int entryLength = buf.readableBytes();
                    long entryId = entry.getEntryId();
                    CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
                    ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(12, 12);
                    entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
                    entryBuf.addComponents(true, new ByteBuf[]{entryHeaderBuf, buf});
                    entries.add(entryBuf);
                }
                linkedList = entries;
                if (ledgerEntriesOnce == null) break block12;
            }
            catch (Throwable throwable) {
                try {
                    if (ledgerEntriesOnce != null) {
                        try {
                            ledgerEntriesOnce.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error("Exception when get CompletableFuture<LedgerEntries>. ", (Throwable)e);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    throw new IOException(e);
                }
            }
            ledgerEntriesOnce.close();
        }
        return linkedList;
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (b == null) {
            throw new NullPointerException("The given bytes are null");
        }
        if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
        }
        if (len == 0) {
            return 0;
        }
        int offset = off;
        int readLen = len;
        int readBytes = 0;
        if (this.dataBlockHeaderStream.available() > 0) {
            int read = this.dataBlockHeaderStream.read(b, off, len);
            offset += read;
            readLen -= read;
            readBytes += read;
            this.bytesReadOffset += read;
        }
        if (readLen == 0) {
            return readBytes;
        }
        if (this.bytesReadOffset < this.blockSize) {
            readLen = Math.min(readLen, this.blockSize - this.bytesReadOffset);
            ByteBuf readEntries = this.readEntries(readLen);
            int read = readEntries.readableBytes();
            readEntries.readBytes(b, offset, read);
            readEntries.release();
            return readBytes += read;
        }
        return -1;
    }

    @Override
    public int read() throws IOException {
        if (this.dataBlockHeaderStream.available() > 0) {
            ++this.bytesReadOffset;
            return this.dataBlockHeaderStream.read();
        }
        if (this.bytesReadOffset < this.blockSize) {
            return this.readEntries();
        }
        return -1;
    }

    @Override
    public void close() throws IOException {
        if (this.close.compareAndSet(false, true)) {
            super.close();
            this.dataBlockHeaderStream.close();
            if (!this.entriesByteBuf.isEmpty()) {
                this.entriesByteBuf.forEach(buf -> buf.release());
                this.entriesByteBuf.clear();
            }
            this.paddingBuf.clear();
            this.paddingBuf.release();
        }
    }

    @Override
    public ReadHandle getLedger() {
        return this.ledger;
    }

    @Override
    public long getStartEntryId() {
        return this.startEntryId;
    }

    @Override
    public int getBlockSize() {
        return this.blockSize;
    }

    public int getDataBlockFullOffset() {
        return this.dataBlockFullOffset;
    }

    @Override
    public int getBlockEntryCount() {
        return this.blockEntryCount;
    }

    @Override
    public long getEndEntryId() {
        if (this.blockEntryCount == 0) {
            return -1L;
        }
        return this.startEntryId + (long)this.blockEntryCount - 1L;
    }

    @Override
    public int getBlockEntryBytesCount() {
        return this.dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() - 12 * this.blockEntryCount;
    }

    public static long getHeaderSize() {
        return DataBlockHeaderImpl.getDataStartOffset();
    }

    public static int calculateBlockSize(int maxBlockSize, ReadHandle readHandle, long firstEntryToWrite, long entryBytesAlreadyWritten) {
        return (int)Math.min((long)maxBlockSize, (readHandle.getLastAddConfirmed() - firstEntryToWrite + 1L) * 12L + (readHandle.getLength() - entryBytesAlreadyWritten) + (long)DataBlockHeaderImpl.getDataStartOffset());
    }
}

