/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobStoreBackedReadHandleImpl
implements ReadHandle {
    private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
    private static final int CACHE_TTL_SECONDS = Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 1800);
    private final long ledgerId;
    private final OffloadIndexBlock index;
    private final BackedInputStream inputStream;
    private final DataInputStream dataStream;
    private final ExecutorService executor;
    private final Cache<Long, Long> entryOffsets = CacheBuilder.newBuilder().expireAfterAccess((long)CACHE_TTL_SECONDS, TimeUnit.SECONDS).build();
    private State state = null;

    private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, BackedInputStream inputStream, ExecutorService executor) {
        this.ledgerId = ledgerId;
        this.index = index;
        this.inputStream = inputStream;
        this.dataStream = new DataInputStream(inputStream);
        this.executor = executor;
        this.state = State.Opened;
    }

    public long getId() {
        return this.ledgerId;
    }

    public LedgerMetadata getLedgerMetadata() {
        return this.index.getLedgerMetadata();
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.executor.execute(() -> {
            try {
                this.index.close();
                this.inputStream.close();
                this.entryOffsets.invalidateAll();
                this.state = State.Closed;
                promise.complete(null);
            }
            catch (IOException t) {
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
        if (log.isDebugEnabled()) {
            log.debug("Ledger {}: reading {} - {} ({} entries}", new Object[]{this.getId(), firstEntry, lastEntry, 1L + lastEntry - firstEntry});
        }
        CompletableFuture<LedgerEntries> promise = new CompletableFuture<LedgerEntries>();
        this.executor.execute(() -> {
            if (this.state == State.Closed) {
                log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
                promise.completeExceptionally((Throwable)new ManagedLedgerException.OffloadReadHandleClosedException());
                return;
            }
            ArrayList<LedgerEntryImpl> entries = new ArrayList<LedgerEntryImpl>();
            boolean seeked = false;
            try {
                if (firstEntry > lastEntry || firstEntry < 0L || lastEntry > this.getLastAddConfirmed()) {
                    promise.completeExceptionally((Throwable)new BKException.BKIncorrectParameterException());
                    return;
                }
                long entriesToRead = lastEntry - firstEntry + 1L;
                long nextExpectedId = firstEntry;
                if (this.dataStream.available() < 12) {
                    log.warn("There hasn't enough data to read, current available data has {} bytes, seek to the first entry {} to avoid EOF exception", (Object)this.inputStream.available(), (Object)firstEntry);
                    this.seekToEntry(firstEntry);
                }
                while (entriesToRead > 0L) {
                    long currentPosition = this.inputStream.getCurrentPosition();
                    int length = this.dataStream.readInt();
                    if (length < 0) {
                        this.seekToEntry(nextExpectedId);
                        continue;
                    }
                    long entryId = this.dataStream.readLong();
                    if (entryId == nextExpectedId) {
                        this.entryOffsets.put((Object)entryId, (Object)currentPosition);
                        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
                        entries.add(LedgerEntryImpl.create((long)this.ledgerId, (long)entryId, (long)length, (ByteBuf)buf));
                        for (int toWrite = length; toWrite > 0; toWrite -= buf.writeBytes((InputStream)this.dataStream, toWrite)) {
                        }
                        --entriesToRead;
                        ++nextExpectedId;
                        continue;
                    }
                    if (entryId > nextExpectedId && entryId < lastEntry) {
                        log.warn("The read entry {} is not the expected entry {} but in the range of {} - {}, seeking to the right position", new Object[]{entryId, nextExpectedId, nextExpectedId, lastEntry});
                        this.seekToEntry(nextExpectedId);
                        continue;
                    }
                    if (entryId < nextExpectedId && !this.index.getIndexEntryForEntry(nextExpectedId).equals(this.index.getIndexEntryForEntry(entryId))) {
                        log.warn("Read an unexpected entry id {} which is smaller than the next expected entry id {}, seeking to the right position", (Object)entryId, (Object)nextExpectedId);
                        this.seekToEntry(nextExpectedId);
                        continue;
                    }
                    if (entryId > lastEntry) {
                        if (!seeked) {
                            this.seekToEntry(nextExpectedId);
                            seeked = true;
                            continue;
                        }
                        log.info("Expected to read {}, but read {}, which is greater than last entry {}", new Object[]{nextExpectedId, entryId, lastEntry});
                        throw new BKException.BKUnexpectedConditionException();
                    }
                    long l = this.inputStream.skip(length);
                }
                promise.complete((LedgerEntries)LedgerEntriesImpl.create(entries));
            }
            catch (Throwable t) {
                log.error("Failed to read entries {} - {} from the offloader in ledger {}", new Object[]{firstEntry, lastEntry, this.ledgerId, t});
                promise.completeExceptionally(t);
                entries.forEach(LedgerEntry::close);
            }
        });
        return promise;
    }

    private void seekToEntry(long nextExpectedId) throws IOException {
        Long knownOffset = (Long)this.entryOffsets.getIfPresent((Object)nextExpectedId);
        if (knownOffset != null) {
            this.inputStream.seek(knownOffset);
        } else {
            long dataOffset = this.index.getIndexEntryForEntry(nextExpectedId).getDataOffset();
            this.inputStream.seek(dataOffset);
        }
    }

    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
        return this.readAsync(firstEntry, lastEntry);
    }

    public CompletableFuture<Long> readLastAddConfirmedAsync() {
        return CompletableFuture.completedFuture(this.getLastAddConfirmed());
    }

    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
        return CompletableFuture.completedFuture(this.getLastAddConfirmed());
    }

    public long getLastAddConfirmed() {
        return this.getLedgerMetadata().getLastEntryId();
    }

    public long getLength() {
        return this.getLedgerMetadata().getLength();
    }

    public boolean isClosed() {
        return this.getLedgerMetadata().isClosed();
    }

    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel) {
        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<LastConfirmedAndEntry>();
        promise.completeExceptionally(new UnsupportedOperationException());
        return promise;
    }

    public static ReadHandle open(ScheduledExecutorService executor, BlobStore blobStore, String bucket, String key, String indexKey, DataBlockUtils.VersionCheck versionCheck, long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats, String managedLedgerName) throws IOException {
        int retryCount = 3;
        OffloadIndexBlock index = null;
        IOException lastException = null;
        while (retryCount-- > 0) {
            long readIndexStartTime = System.nanoTime();
            Blob blob = blobStore.getBlob(bucket, indexKey);
            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName, System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS);
            versionCheck.check(indexKey, blob);
            OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
            try (InputStream payLoadStream = blob.getPayload().openStream();){
                index = (OffloadIndexBlock)indexBuilder.fromStream(payLoadStream);
            }
            catch (IOException e) {
                log.warn("Failed to get index block from the offoaded index file {}, still have {} times to retry", new Object[]{indexKey, retryCount, e});
                lastException = e;
                continue;
            }
            lastException = null;
            break;
        }
        if (lastException != null) {
            throw lastException;
        }
        BlobStoreBackedInputStreamImpl inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
        return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
    }

    State getState() {
        return this.state;
    }

    static enum State {
        Opened,
        Closed;

    }
}

