/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobStoreBackedInputStreamImpl
extends BackedInputStream {
    private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class);
    private final BlobStore blobStore;
    private final String bucket;
    private final String key;
    private final DataBlockUtils.VersionCheck versionCheck;
    private final ByteBuf buffer;
    private final long objectLen;
    private final int bufferSize;
    private LedgerOffloaderStats offloaderStats;
    private String managedLedgerName;
    private String topicName;
    private long cursor;
    private long bufferOffsetStart;
    private long bufferOffsetEnd;

    public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key, DataBlockUtils.VersionCheck versionCheck, long objectLen, int bufferSize) {
        this.blobStore = blobStore;
        this.bucket = bucket;
        this.key = key;
        this.versionCheck = versionCheck;
        this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
        this.objectLen = objectLen;
        this.bufferSize = bufferSize;
        this.cursor = 0L;
        this.bufferOffsetEnd = -1L;
        this.bufferOffsetStart = -1L;
    }

    public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key, DataBlockUtils.VersionCheck versionCheck, long objectLen, int bufferSize, LedgerOffloaderStats offloaderStats, String managedLedgerName) {
        this(blobStore, bucket, key, versionCheck, objectLen, bufferSize);
        this.offloaderStats = offloaderStats;
        this.managedLedgerName = managedLedgerName;
        this.topicName = TopicName.fromPersistenceNamingEncoding((String)managedLedgerName);
    }

    private boolean refillBufferIfNeeded() throws IOException {
        if (this.buffer.readableBytes() == 0) {
            if (this.cursor >= this.objectLen) {
                return false;
            }
            long startRange = this.cursor;
            long endRange = Math.min(this.cursor + (long)this.bufferSize - 1L, this.objectLen - 1L);
            if (log.isDebugEnabled()) {
                log.info("refillBufferIfNeeded {} - {} ({} bytes to fill)", new Object[]{startRange, endRange, endRange - startRange});
            }
            try {
                long startReadTime = System.nanoTime();
                Blob blob = this.blobStore.getBlob(this.bucket, this.key, new GetOptions().range(startRange, endRange));
                this.versionCheck.check(this.key, blob);
                try (InputStream stream = blob.getPayload().openStream();){
                    this.buffer.clear();
                    this.bufferOffsetStart = startRange;
                    this.bufferOffsetEnd = endRange;
                    long bytesRead = endRange - startRange + 1L;
                    for (int bytesToCopy = (int)bytesRead; bytesToCopy > 0; bytesToCopy -= this.buffer.writeBytes(stream, bytesToCopy)) {
                    }
                    this.cursor += (long)this.buffer.readableBytes();
                }
                if (this.offloaderStats != null) {
                    this.offloaderStats.recordReadOffloadDataLatency(this.topicName, System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
                    this.offloaderStats.recordReadOffloadBytes(this.topicName, endRange - startRange + 1L);
                }
            }
            catch (Throwable e) {
                if (null != this.offloaderStats) {
                    this.offloaderStats.recordReadOffloadError(this.topicName);
                }
                throw new IOException("Error reading from BlobStore", e);
            }
        }
        return true;
    }

    @Override
    public int read() throws IOException {
        if (this.refillBufferIfNeeded()) {
            return this.buffer.readUnsignedByte();
        }
        return -1;
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (this.refillBufferIfNeeded()) {
            int bytesToRead = Math.min(len, this.buffer.readableBytes());
            this.buffer.readBytes(b, off, bytesToRead);
            return bytesToRead;
        }
        return -1;
    }

    @Override
    public void seek(long position) {
        log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})", new Object[]{position, this.bucket, this.key, this.cursor, this.bufferOffsetStart, this.bufferOffsetEnd});
        if (position >= this.bufferOffsetStart && position <= this.bufferOffsetEnd) {
            long newIndex = position - this.bufferOffsetStart;
            this.buffer.readerIndex((int)newIndex);
        } else {
            this.bufferOffsetEnd = -1L;
            this.bufferOffsetStart = -1L;
            this.cursor = position;
            this.buffer.clear();
        }
    }

    @Override
    public void seekForward(long position) throws IOException {
        if (position < this.cursor) {
            throw new IOException(String.format("Error seeking, new position %d < current position %d", position, this.cursor));
        }
        this.seek(position);
    }

    @Override
    public long getCurrentPosition() {
        if (this.bufferOffsetStart != -1L) {
            return this.bufferOffsetStart + (long)this.buffer.readerIndex();
        }
        return this.cursor + (long)this.buffer.readerIndex();
    }

    @Override
    public void close() {
        this.buffer.release();
    }

    @Override
    public int available() throws IOException {
        long available = this.objectLen - this.cursor + (long)this.buffer.readableBytes();
        return available > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)available;
    }
}

