/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobStoreBackedReadHandleImplV2
implements ReadHandle {
    private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImplV2.class);
    private final long ledgerId;
    private final List<OffloadIndexBlockV2> indices;
    private final List<BackedInputStream> inputStreams;
    private final List<DataInputStream> dataStreams;
    private final ExecutorService executor;

    private BlobStoreBackedReadHandleImplV2(long ledgerId, List<OffloadIndexBlockV2> indices, List<BackedInputStream> inputStreams, ExecutorService executor) {
        this.ledgerId = ledgerId;
        this.indices = indices;
        this.inputStreams = inputStreams;
        this.dataStreams = new LinkedList<DataInputStream>();
        for (BackedInputStream inputStream : inputStreams) {
            this.dataStreams.add(new DataInputStream(inputStream));
        }
        this.executor = executor;
    }

    public long getId() {
        return this.ledgerId;
    }

    public LedgerMetadata getLedgerMetadata() {
        return this.indices.get(this.indices.size() - 1).getLedgerMetadata(this.ledgerId);
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.executor.submit(() -> {
            try {
                for (OffloadIndexBlockV2 indexBlock : this.indices) {
                    indexBlock.close();
                }
                for (DataInputStream dataStream : this.dataStreams) {
                    dataStream.close();
                }
                promise.complete(null);
            }
            catch (IOException t) {
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
        log.debug("Ledger {}: reading {} - {}", new Object[]{this.getId(), firstEntry, lastEntry});
        CompletableFuture<LedgerEntries> promise = new CompletableFuture<LedgerEntries>();
        if (firstEntry > lastEntry || firstEntry < 0L || lastEntry > this.getLastAddConfirmed()) {
            promise.completeExceptionally(new IllegalArgumentException());
            return promise;
        }
        this.executor.submit(() -> {
            ArrayList<LedgerEntryImpl> entries = new ArrayList<LedgerEntryImpl>();
            List<GroupedReader> groupedReaders = null;
            try {
                groupedReaders = this.getGroupedReader(firstEntry, lastEntry);
            }
            catch (Exception e) {
                promise.completeExceptionally(e);
                return;
            }
            for (GroupedReader groupedReader : groupedReaders) {
                long entriesToRead = groupedReader.lastEntry - groupedReader.firstEntry + 1L;
                long nextExpectedId = groupedReader.firstEntry;
                try {
                    while (entriesToRead > 0L) {
                        int length = groupedReader.dataStream.readInt();
                        if (length < 0) {
                            groupedReader.inputStream.seek(groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId).getDataOffset());
                            continue;
                        }
                        long entryId = groupedReader.dataStream.readLong();
                        if (entryId == nextExpectedId) {
                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
                            entries.add(LedgerEntryImpl.create((long)this.ledgerId, (long)entryId, (long)length, (ByteBuf)buf));
                            for (int toWrite = length; toWrite > 0; toWrite -= buf.writeBytes((InputStream)groupedReader.dataStream, toWrite)) {
                            }
                            --entriesToRead;
                            ++nextExpectedId;
                            continue;
                        }
                        if (entryId > nextExpectedId) {
                            groupedReader.inputStream.seek(groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId).getDataOffset());
                            continue;
                        }
                        if (entryId < nextExpectedId && !groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId).equals(groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, entryId))) {
                            groupedReader.inputStream.seek(groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId).getDataOffset());
                            continue;
                        }
                        if (entryId > groupedReader.lastEntry) {
                            log.info("Expected to read {}, but read {}, which is greater than last entry {}", new Object[]{nextExpectedId, entryId, groupedReader.lastEntry});
                            throw new BKException.BKUnexpectedConditionException();
                        }
                        long l = groupedReader.inputStream.skip(length);
                    }
                }
                catch (Throwable t) {
                    promise.completeExceptionally(t);
                    entries.forEach(LedgerEntry::close);
                }
                promise.complete((LedgerEntries)LedgerEntriesImpl.create(entries));
            }
        });
        return promise;
    }

    private List<GroupedReader> getGroupedReader(long firstEntry, long lastEntry) throws Exception {
        int i;
        LinkedList<GroupedReader> groupedReaders = new LinkedList<GroupedReader>();
        for (i = this.indices.size() - 1; i >= 0 && firstEntry <= lastEntry; --i) {
            OffloadIndexBlockV2 index = this.indices.get(i);
            long startEntryId = index.getStartEntryId(this.ledgerId);
            if (startEntryId > lastEntry) {
                log.debug("entries are in earlier indices, skip this segment ledger id: {}, begin entry id: {}", (Object)this.ledgerId, (Object)startEntryId);
                continue;
            }
            groupedReaders.add(new GroupedReader(this.ledgerId, startEntryId, lastEntry, index, this.inputStreams.get(i), this.dataStreams.get(i)));
            lastEntry = startEntryId - 1L;
        }
        Preconditions.checkArgument((firstEntry > lastEntry ? 1 : 0) != 0);
        for (i = 0; i < groupedReaders.size() - 1; ++i) {
            GroupedReader readerI = (GroupedReader)groupedReaders.get(i);
            GroupedReader readerII = (GroupedReader)groupedReaders.get(i + 1);
            Preconditions.checkArgument((readerI.ledgerId == readerII.ledgerId ? 1 : 0) != 0);
            Preconditions.checkArgument((readerI.firstEntry >= readerII.lastEntry ? 1 : 0) != 0);
        }
        return groupedReaders;
    }

    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
        return this.readAsync(firstEntry, lastEntry);
    }

    public CompletableFuture<Long> readLastAddConfirmedAsync() {
        return CompletableFuture.completedFuture(this.getLastAddConfirmed());
    }

    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
        return CompletableFuture.completedFuture(this.getLastAddConfirmed());
    }

    public long getLastAddConfirmed() {
        return this.getLedgerMetadata().getLastEntryId();
    }

    public long getLength() {
        return this.getLedgerMetadata().getLength();
    }

    public boolean isClosed() {
        return this.getLedgerMetadata().isClosed();
    }

    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel) {
        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<LastConfirmedAndEntry>();
        promise.completeExceptionally(new UnsupportedOperationException());
        return promise;
    }

    public static ReadHandle open(ScheduledExecutorService executor, BlobStore blobStore, String bucket, List<String> keys, List<String> indexKeys, DataBlockUtils.VersionCheck versionCheck, long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats, String managedLedgerName) throws IOException {
        LinkedList<BackedInputStream> inputStreams = new LinkedList<BackedInputStream>();
        LinkedList<OffloadIndexBlockV2> indice = new LinkedList<OffloadIndexBlockV2>();
        for (int i = 0; i < indexKeys.size(); ++i) {
            OffloadIndexBlockV2 index;
            String indexKey = indexKeys.get(i);
            String key = keys.get(i);
            log.debug("open bucket: {} index key: {}", (Object)bucket, (Object)indexKey);
            long startTime = System.nanoTime();
            Blob blob = blobStore.getBlob(bucket, indexKey);
            offloaderStats.recordReadOffloadIndexLatency(managedLedgerName, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
            log.debug("indexKey blob: {} {}", (Object)indexKey, (Object)blob);
            versionCheck.check(indexKey, blob);
            OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
            try (InputStream payloadStream = blob.getPayload().openStream();){
                index = indexBuilder.fromStream(payloadStream);
            }
            BlobStoreBackedInputStreamImpl inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
            inputStreams.add(inputStream);
            indice.add(index);
        }
        return new BlobStoreBackedReadHandleImplV2(ledgerId, indice, inputStreams, executor);
    }

    static class GroupedReader {
        public final long ledgerId;
        public final long firstEntry;
        public final long lastEntry;
        OffloadIndexBlockV2 index;
        BackedInputStream inputStream;
        DataInputStream dataStream;

        public String toString() {
            return "GroupedReader{ledgerId=" + this.ledgerId + ", firstEntry=" + this.firstEntry + ", lastEntry=" + this.lastEntry + '}';
        }

        public GroupedReader(long ledgerId, long firstEntry, long lastEntry, OffloadIndexBlockV2 index, BackedInputStream inputStream, DataInputStream dataStream) {
            this.ledgerId = ledgerId;
            this.firstEntry = firstEntry;
            this.lastEntry = lastEntry;
            this.index = index;
            this.inputStream = inputStream;
            this.dataStream = dataStream;
        }
    }
}

