/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockHeaderImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockAwareSegmentInputStreamImpl
extends BlockAwareSegmentInputStream {
    private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
    static final int[] BLOCK_END_PADDING = new int[]{254, 220, 222, 173};
    private final ReadHandle ledger;
    private final long startEntryId;
    private final int blockSize;
    private int blockEntryCount;
    private int bytesReadOffset = 0;
    private int dataBlockFullOffset;
    private final InputStream dataBlockHeaderStream;
    private static final int ENTRIES_PER_READ = 100;
    static final int ENTRY_HEADER_SIZE = 12;
    private List<ByteBuf> entriesByteBuf = null;
    private LedgerOffloaderStats offloaderStats;
    private String topicName;

    public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
        this.ledger = ledger;
        this.startEntryId = startEntryId;
        this.blockSize = blockSize;
        this.dataBlockHeaderStream = DataBlockHeaderImpl.of(blockSize, startEntryId).toStream();
        this.blockEntryCount = 0;
        this.dataBlockFullOffset = blockSize;
        this.entriesByteBuf = Lists.newLinkedList();
    }

    public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize, LedgerOffloaderStats offloaderStats, String ledgerName) {
        this(ledger, startEntryId, blockSize);
        this.offloaderStats = offloaderStats;
        this.topicName = ledgerName;
    }

    private int readEntries() throws IOException {
        Preconditions.checkState((this.bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset() ? 1 : 0) != 0);
        Preconditions.checkState((this.bytesReadOffset < this.blockSize ? 1 : 0) != 0);
        if (this.bytesReadOffset < this.dataBlockFullOffset && this.entriesByteBuf.isEmpty() && this.startEntryId + (long)this.blockEntryCount <= this.ledger.getLastAddConfirmed()) {
            this.entriesByteBuf = this.readNextEntriesFromLedger(this.startEntryId + (long)this.blockEntryCount, 100L);
        }
        if (!this.entriesByteBuf.isEmpty() && this.bytesReadOffset + this.entriesByteBuf.get(0).readableBytes() <= this.blockSize) {
            ByteBuf entryByteBuf = this.entriesByteBuf.get(0);
            short ret = entryByteBuf.readUnsignedByte();
            ++this.bytesReadOffset;
            if (entryByteBuf.readableBytes() == 0) {
                entryByteBuf.release();
                this.entriesByteBuf.remove(0);
                ++this.blockEntryCount;
            }
            return ret;
        }
        if (this.dataBlockFullOffset == this.blockSize) {
            this.dataBlockFullOffset = this.bytesReadOffset;
        }
        return BLOCK_END_PADDING[(this.bytesReadOffset++ - this.dataBlockFullOffset) % BLOCK_END_PADDING.length];
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException {
        long end = Math.min(start + maxNumberEntries - 1L, this.ledger.getLastAddConfirmed());
        long startTime = System.nanoTime();
        try (LedgerEntries ledgerEntriesOnce = (LedgerEntries)this.ledger.readAsync(start, end).get();){
            if (log.isDebugEnabled()) {
                log.debug("read ledger entries. start: {}, end: {} cost {}", new Object[]{start, end, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime)});
            }
            if (this.offloaderStats != null && this.topicName != null) {
                this.offloaderStats.recordReadLedgerLatency(this.topicName, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
            }
            LinkedList entries = Lists.newLinkedList();
            for (LedgerEntry entry : ledgerEntriesOnce) {
                ByteBuf buf = entry.getEntryBuffer().retain();
                int entryLength = buf.readableBytes();
                long entryId = entry.getEntryId();
                CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
                ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(12, 12);
                entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
                entryBuf.addComponents(true, new ByteBuf[]{entryHeaderBuf, buf});
                entries.add(entryBuf);
            }
            LinkedList linkedList = entries;
            return linkedList;
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("Exception when get CompletableFuture<LedgerEntries>. ", (Throwable)e);
            if (!(e instanceof InterruptedException)) throw new IOException(e);
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
    }

    @Override
    public int read() throws IOException {
        if (this.dataBlockHeaderStream.available() > 0) {
            ++this.bytesReadOffset;
            return this.dataBlockHeaderStream.read();
        }
        if (this.bytesReadOffset < this.blockSize) {
            return this.readEntries();
        }
        return -1;
    }

    @Override
    public void close() throws IOException {
        super.close();
        this.dataBlockHeaderStream.close();
        if (!this.entriesByteBuf.isEmpty()) {
            this.entriesByteBuf.forEach(buf -> buf.release());
            this.entriesByteBuf.clear();
        }
    }

    @Override
    public ReadHandle getLedger() {
        return this.ledger;
    }

    @Override
    public long getStartEntryId() {
        return this.startEntryId;
    }

    @Override
    public int getBlockSize() {
        return this.blockSize;
    }

    @Override
    public int getBlockEntryCount() {
        return this.blockEntryCount;
    }

    @Override
    public long getEndEntryId() {
        if (this.blockEntryCount == 0) {
            return -1L;
        }
        return this.startEntryId + (long)this.blockEntryCount - 1L;
    }

    @Override
    public int getBlockEntryBytesCount() {
        return this.dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() - 12 * this.blockEntryCount;
    }

    public static long getHeaderSize() {
        return DataBlockHeaderImpl.getDataStartOffset();
    }

    public static int calculateBlockSize(int maxBlockSize, ReadHandle readHandle, long firstEntryToWrite, long entryBytesAlreadyWritten) {
        return (int)Math.min((long)maxBlockSize, (readHandle.getLastAddConfirmed() - firstEntryToWrite + 1L) * 12L + (readHandle.getLength() - entryBytesAlreadyWritten) + (long)DataBlockHeaderImpl.getDataStartOffset());
    }
}

