/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.NonNull;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.OffloadSegmentInfoImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImplV2;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlockAwareSegmentInputStreamImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BufferedOffloadStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.StreamingDataBlockHeaderImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.MultipartPart;
import org.jclouds.blobstore.domain.MultipartUpload;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.InputStreamPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobStoreManagedLedgerOffloader
implements LedgerOffloader {
    private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
    private final OrderedScheduler scheduler;
    private final TieredStorageConfiguration config;
    private final Location writeLocation;
    private final Map<String, String> userMetadata;
    private final ConcurrentMap<BlobStoreLocation, BlobStore> blobStores = new ConcurrentHashMap<BlobStoreLocation, BlobStore>();
    private OffloadSegmentInfoImpl segmentInfo;
    private AtomicLong bufferLength = new AtomicLong(0L);
    private AtomicLong segmentLength = new AtomicLong(0L);
    private final long maxBufferLength;
    private final ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue();
    private CompletableFuture<LedgerOffloader.OffloadResult> offloadResult;
    private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
    private final Duration maxSegmentCloseTime;
    private final long minSegmentCloseTimeMillis;
    private final long segmentBeginTimeMillis;
    private final long maxSegmentLength;
    private final int streamingBlockSize;
    private volatile ManagedLedger ml;
    private OffloadIndexBlockV2Builder streamingIndexBuilder;
    BlobStore blobStore;
    String streamingDataBlockKey;
    String streamingDataIndexKey;
    MultipartUpload streamingMpu = null;
    List<MultipartPart> streamingParts = Lists.newArrayList();

    public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config, Map<String, String> userMetadata, OrderedScheduler scheduler) throws IOException {
        return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata);
    }

    BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler, Map<String, String> userMetadata) {
        this.scheduler = scheduler;
        this.userMetadata = userMetadata;
        this.config = config;
        this.streamingBlockSize = config.getMinBlockSizeInBytes();
        this.maxSegmentCloseTime = Duration.ofSeconds(config.getMaxSegmentTimeInSecond());
        this.maxSegmentLength = config.getMaxSegmentSizeInBytes();
        this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
        this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes());
        this.segmentBeginTimeMillis = System.currentTimeMillis();
        this.writeLocation = !Strings.isNullOrEmpty((String)config.getRegion()) ? new LocationBuilder().scope(LocationScope.REGION).id(config.getRegion()).description(config.getRegion()).build() : null;
        log.info("Constructor offload driver: {}, host: {}, container: {}, region: {} ", new Object[]{config.getProvider().getDriver(), config.getServiceEndpoint(), config.getBucket(), config.getRegion()});
        this.blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore());
        log.info("The ledger offloader was created.");
    }

    public String getOffloadDriverName() {
        return this.config.getDriver();
    }

    public Map<String, String> getOffloadDriverMetadata() {
        return this.config.getOffloadDriverMetadata();
    }

    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
        BlobStore writeBlobStore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.chooseThread(readHandle.getId()).submit(() -> {
            if (readHandle.getLength() == 0L || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0L) {
                promise.completeExceptionally(new IllegalArgumentException("An empty or open ledger should never be offloaded"));
                return;
            }
            OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create().withLedgerMetadata(readHandle.getLedgerMetadata()).withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
            String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid);
            String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid);
            MultipartUpload mpu = null;
            ArrayList parts = Lists.newArrayList();
            try {
                BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey);
                DataBlockUtils.addVersionInfo(blobBuilder, this.userMetadata);
                Blob blob = blobBuilder.build();
                mpu = writeBlobStore.initiateMultipartUpload(this.config.getBucket(), (BlobMetadata)blob.getMetadata(), new PutOptions());
            }
            catch (Throwable t) {
                promise.completeExceptionally(t);
                return;
            }
            long dataObjectLength = 0L;
            try {
                long startEntry = 0L;
                int partId = 1;
                long entryBytesWritten = 0L;
                while (startEntry <= readHandle.getLastAddConfirmed()) {
                    int blockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize(this.config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten);
                    try (BlockAwareSegmentInputStreamImpl blockStream = new BlockAwareSegmentInputStreamImpl(readHandle, startEntry, blockSize);){
                        InputStreamPayload partPayload = Payloads.newInputStreamPayload((InputStream)blockStream);
                        partPayload.getContentMetadata().setContentLength(Long.valueOf(blockSize));
                        partPayload.getContentMetadata().setContentType("application/octet-stream");
                        parts.add(writeBlobStore.uploadMultipartPart(mpu, partId, (Payload)partPayload));
                        log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", new Object[]{this.config.getBucket(), dataBlockKey, partId, mpu.id()});
                        indexBuilder.addBlock(startEntry, partId, blockSize);
                        if (((BlockAwareSegmentInputStream)blockStream).getEndEntryId() == -1L) break;
                        startEntry = ((BlockAwareSegmentInputStream)blockStream).getEndEntryId() + 1L;
                        entryBytesWritten += (long)((BlockAwareSegmentInputStream)blockStream).getBlockEntryBytesCount();
                        ++partId;
                    }
                    dataObjectLength += (long)blockSize;
                }
                writeBlobStore.completeMultipartUpload(mpu, (List)parts);
                mpu = null;
            }
            catch (Throwable t) {
                try {
                    if (mpu != null) {
                        writeBlobStore.abortMultipartUpload(mpu);
                    }
                }
                catch (Throwable throwable) {
                    log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", new Object[]{this.config.getBucket(), dataBlockKey, mpu.id(), throwable});
                }
                promise.completeExceptionally(t);
                return;
            }
            try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
                 OffloadIndexBlock.IndexInputStream indexStream = index.toStream();){
                BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey);
                DataBlockUtils.addVersionInfo(blobBuilder, this.userMetadata);
                InputStreamPayload indexPayload = Payloads.newInputStreamPayload((InputStream)indexStream);
                indexPayload.getContentMetadata().setContentLength(Long.valueOf(indexStream.getStreamSize()));
                indexPayload.getContentMetadata().setContentType("application/octet-stream");
                Blob blob = blobBuilder.payload((Payload)indexPayload).contentLength(indexStream.getStreamSize()).build();
                writeBlobStore.putBlob(this.config.getBucket(), blob);
                promise.complete(null);
            }
            catch (Throwable t) {
                try {
                    writeBlobStore.removeBlob(this.config.getBucket(), dataBlockKey);
                }
                catch (Throwable throwable) {
                    log.error("Failed deleteObject in bucket - {} with key - {}.", new Object[]{this.config.getBucket(), dataBlockKey, throwable});
                }
                promise.completeExceptionally(t);
                return;
            }
        });
        return promise;
    }

    public CompletableFuture<LedgerOffloader.OffloadHandle> streamingOffload(@NonNull ManagedLedger ml, UUID uuid, long beginLedger, long beginEntry, Map<String, String> driverMetadata) {
        if (ml == null) {
            throw new NullPointerException("ml is marked non-null but is null");
        }
        if (this.ml != null) {
            log.error("streamingOffload should only be called once");
            CompletableFuture result = new CompletableFuture();
            result.completeExceptionally(new RuntimeException("streamingOffload should only be called once"));
        }
        this.ml = ml;
        this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, beginEntry, this.config.getDriver(), driverMetadata);
        log.debug("begin offload with {}:{}", (Object)beginLedger, (Object)beginEntry);
        this.offloadResult = new CompletableFuture();
        this.blobStore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        this.streamingIndexBuilder = OffloadIndexBlockV2Builder.create();
        this.streamingDataBlockKey = this.segmentInfo.uuid.toString();
        this.streamingDataIndexKey = String.format("%s-index", this.segmentInfo.uuid);
        BlobBuilder blobBuilder = this.blobStore.blobBuilder(this.streamingDataBlockKey);
        DataBlockUtils.addVersionInfo(blobBuilder, this.userMetadata);
        Blob blob = blobBuilder.build();
        this.streamingMpu = this.blobStore.initiateMultipartUpload(this.config.getBucket(), (BlobMetadata)blob.getMetadata(), new PutOptions());
        this.scheduler.chooseThread((Object)this.segmentInfo).execute(() -> {
            log.info("start offloading segment: {}", (Object)this.segmentInfo);
            this.streamingOffloadLoop(1, 0);
        });
        this.scheduler.schedule(this::closeSegment, this.maxSegmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);
        return CompletableFuture.completedFuture(new LedgerOffloader.OffloadHandle(){

            public Position lastOffered() {
                return BlobStoreManagedLedgerOffloader.this.lastOffered();
            }

            public CompletableFuture<Position> lastOfferedAsync() {
                return CompletableFuture.completedFuture(this.lastOffered());
            }

            public LedgerOffloader.OffloadHandle.OfferEntryResult offerEntry(Entry entry) {
                return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
            }

            public CompletableFuture<LedgerOffloader.OffloadHandle.OfferEntryResult> offerEntryAsync(Entry entry) {
                return CompletableFuture.completedFuture(this.offerEntry(entry));
            }

            public CompletableFuture<LedgerOffloader.OffloadResult> getOffloadResultAsync() {
                return BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
            }

            public boolean close() {
                return BlobStoreManagedLedgerOffloader.this.closeSegment();
            }
        });
    }

    private void streamingOffloadLoop(int partId, int dataObjectLength) {
        log.debug("streaming offload loop {} {}", (Object)partId, (Object)dataObjectLength);
        if (this.segmentInfo.isClosed() && this.offloadBuffer.isEmpty()) {
            this.buildIndexAndCompleteResult(dataObjectLength);
            this.offloadResult.complete(this.segmentInfo.result());
        } else if (this.segmentInfo.isClosed() && !this.offloadBuffer.isEmpty() || this.bufferLength.get() >= (long)this.streamingBlockSize) {
            int blockEntrySize;
            int entrySize;
            LinkedList<Entry> entries = new LinkedList<Entry>();
            Entry firstEntry = this.offloadBuffer.poll();
            entries.add(firstEntry);
            long blockLedgerId = firstEntry.getLedgerId();
            long blockEntryId = firstEntry.getEntryId();
            for (blockEntrySize = 0; !this.offloadBuffer.isEmpty() && this.offloadBuffer.peek().getLedgerId() == blockLedgerId && blockEntrySize <= this.streamingBlockSize; blockEntrySize += entrySize) {
                Entry entryInBlock = this.offloadBuffer.poll();
                entrySize = entryInBlock.getLength();
                this.bufferLength.addAndGet(-entrySize);
                entries.add(entryInBlock);
            }
            int blockSize = BufferedOffloadStream.calculateBlockSize(this.streamingBlockSize, entries.size(), blockEntrySize);
            this.buildBlockAndUpload(blockSize, entries, blockLedgerId, blockEntryId, partId);
            this.streamingOffloadLoop(partId + 1, dataObjectLength + blockSize);
        } else {
            log.debug("not enough data, delay schedule for part: {} length: {}", (Object)partId, (Object)dataObjectLength);
            this.scheduler.chooseThread((Object)this.segmentInfo).schedule(() -> this.streamingOffloadLoop(partId, dataObjectLength), 100L, TimeUnit.MILLISECONDS);
        }
    }

    private void buildBlockAndUpload(int blockSize, List<Entry> entries, long blockLedgerId, long beginEntryId, int partId) {
        try (BufferedOffloadStream payloadStream = new BufferedOffloadStream(blockSize, entries, blockLedgerId, beginEntryId);){
            log.debug("begin upload payload: {} {}", (Object)blockLedgerId, (Object)beginEntryId);
            InputStreamPayload partPayload = Payloads.newInputStreamPayload((InputStream)payloadStream);
            partPayload.getContentMetadata().setContentType("application/octet-stream");
            this.streamingParts.add(this.blobStore.uploadMultipartPart(this.streamingMpu, partId, (Payload)partPayload));
            this.streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
            this.streamingIndexBuilder.addBlock(blockLedgerId, beginEntryId, partId, blockSize);
            MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)this.ml.getLedgerInfo(blockLedgerId).get();
            MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder ledgerInfoBuilder = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder();
            if (ledgerInfo != null) {
                ledgerInfoBuilder.mergeFrom(ledgerInfo);
            }
            if (ledgerInfoBuilder.getEntries() == 0L) {
                ledgerInfoBuilder.setEntries(payloadStream.getEndEntryId() + 1L);
            }
            this.streamingIndexBuilder.addLedgerMeta(blockLedgerId, ledgerInfoBuilder.build());
            log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", new Object[]{this.config.getBucket(), this.streamingDataBlockKey, partId, this.streamingMpu.id()});
        }
        catch (Throwable e) {
            this.blobStore.abortMultipartUpload(this.streamingMpu);
            this.offloadResult.completeExceptionally(e);
            return;
        }
    }

    private void buildIndexAndCompleteResult(long dataObjectLength) {
        try {
            this.blobStore.completeMultipartUpload(this.streamingMpu, this.streamingParts);
            this.streamingIndexBuilder.withDataObjectLength(dataObjectLength);
            OffloadIndexBlockV2 index = this.streamingIndexBuilder.buildV2();
            OffloadIndexBlock.IndexInputStream indexStream = index.toStream();
            BlobBuilder indexBlobBuilder = this.blobStore.blobBuilder(this.streamingDataIndexKey);
            this.streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
            DataBlockUtils.addVersionInfo(indexBlobBuilder, this.userMetadata);
            try (InputStreamPayload indexPayLoad = Payloads.newInputStreamPayload((InputStream)indexStream);){
                indexPayLoad.getContentMetadata().setContentLength(Long.valueOf(indexStream.getStreamSize()));
                indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
                Blob indexBlob = indexBlobBuilder.payload((Payload)indexPayLoad).contentLength(indexStream.getStreamSize()).build();
                this.blobStore.putBlob(this.config.getBucket(), indexBlob);
                LedgerOffloader.OffloadResult result = this.segmentInfo.result();
                this.offloadResult.complete(result);
                log.debug("offload segment completed {}", (Object)result);
            }
            catch (Exception e) {
                log.error("streaming offload failed", (Throwable)e);
                this.offloadResult.completeExceptionally(e);
            }
        }
        catch (Exception e) {
            log.error("streaming offload failed", (Throwable)e);
            this.offloadResult.completeExceptionally(e);
        }
    }

    private CompletableFuture<LedgerOffloader.OffloadResult> getOffloadResultAsync() {
        return this.offloadResult;
    }

    private synchronized LedgerOffloader.OffloadHandle.OfferEntryResult offerEntry(Entry entry) {
        if (this.segmentInfo.isClosed()) {
            log.debug("Segment already closed {}", (Object)this.segmentInfo);
            return LedgerOffloader.OffloadHandle.OfferEntryResult.FAIL_SEGMENT_CLOSED;
        }
        if (this.maxBufferLength <= this.bufferLength.get()) {
            return LedgerOffloader.OffloadHandle.OfferEntryResult.FAIL_BUFFER_FULL;
        }
        EntryImpl entryImpl = EntryImpl.create((long)entry.getLedgerId(), (long)entry.getEntryId(), (ByteBuf)entry.getDataBuffer());
        this.offloadBuffer.add((Entry)entryImpl);
        this.bufferLength.getAndAdd(entryImpl.getLength());
        this.segmentLength.getAndAdd(entryImpl.getLength());
        this.lastOfferedPosition = entryImpl.getPosition();
        if (this.segmentLength.get() >= this.maxSegmentLength && System.currentTimeMillis() - this.segmentBeginTimeMillis >= this.minSegmentCloseTimeMillis) {
            this.closeSegment();
        }
        return LedgerOffloader.OffloadHandle.OfferEntryResult.SUCCESS;
    }

    private synchronized boolean closeSegment() {
        boolean result = !this.segmentInfo.isClosed();
        log.debug("close segment {} {}", (Object)this.lastOfferedPosition.getLedgerId(), (Object)this.lastOfferedPosition.getEntryId());
        this.segmentInfo.closeSegment(this.lastOfferedPosition.getLedgerId(), this.lastOfferedPosition.getEntryId());
        return result;
    }

    private PositionImpl lastOffered() {
        return this.lastOfferedPosition;
    }

    private BlobStoreLocation getBlobStoreLocation(Map<String, String> offloadDriverMetadata) {
        return !offloadDriverMetadata.isEmpty() ? new BlobStoreLocation(offloadDriverMetadata) : new BlobStoreLocation(this.getOffloadDriverMetadata());
    }

    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
        BlobStoreLocation bsKey = this.getBlobStoreLocation(offloadDriverMetadata);
        String readBucket = bsKey.getBucket();
        BlobStore readBlobstore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
        String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
        String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
        this.scheduler.chooseThread(ledgerId).submit(() -> {
            try {
                promise.complete(BlobStoreBackedReadHandleImpl.open((ScheduledExecutorService)this.scheduler.chooseThread(ledgerId), readBlobstore, readBucket, key, indexKey, DataBlockUtils.VERSION_CHECK, ledgerId, this.config.getReadBufferSizeInBytes()));
            }
            catch (Throwable t) {
                log.error("Failed readOffloaded: ", t);
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext, Map<String, String> offloadDriverMetadata) {
        BlobStoreLocation bsKey = this.getBlobStoreLocation(offloadDriverMetadata);
        String readBucket = bsKey.getBucket();
        BlobStore readBlobstore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
        List offloadSegmentList = ledgerContext.getOffloadSegmentList();
        LinkedList keys = Lists.newLinkedList();
        LinkedList indexKeys = Lists.newLinkedList();
        offloadSegmentList.forEach(seg -> {
            UUID uuid = new UUID(seg.getUidMsb(), seg.getUidLsb());
            String key = uuid.toString();
            String indexKey = DataBlockUtils.indexBlockOffloadKey(uuid);
            keys.add(key);
            indexKeys.add(indexKey);
        });
        this.scheduler.chooseThread(ledgerId).submit(() -> {
            try {
                promise.complete(BlobStoreBackedReadHandleImplV2.open((ScheduledExecutorService)this.scheduler.chooseThread(ledgerId), readBlobstore, readBucket, keys, indexKeys, DataBlockUtils.VERSION_CHECK, ledgerId, this.config.getReadBufferSizeInBytes()));
            }
            catch (Throwable t) {
                log.error("Failed readOffloaded: ", t);
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
        BlobStoreLocation bsKey = this.getBlobStoreLocation(offloadDriverMetadata);
        String readBucket = bsKey.getBucket(offloadDriverMetadata);
        BlobStore readBlobstore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.chooseThread(ledgerId).submit(() -> {
            try {
                readBlobstore.removeBlobs(readBucket, (Iterable)ImmutableList.of((Object)DataBlockUtils.dataBlockOffloadKey(ledgerId, uid), (Object)DataBlockUtils.indexBlockOffloadKey(ledgerId, uid)));
                promise.complete(null);
            }
            catch (Throwable t) {
                log.error("Failed delete Blob", t);
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> offloadDriverMetadata) {
        BlobStoreLocation bsKey = this.getBlobStoreLocation(offloadDriverMetadata);
        String readBucket = bsKey.getBucket(offloadDriverMetadata);
        BlobStore readBlobstore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.submit(() -> {
            try {
                readBlobstore.removeBlobs(readBucket, (Iterable)ImmutableList.of((Object)uid.toString(), (Object)DataBlockUtils.indexBlockOffloadKey(uid)));
                promise.complete(null);
            }
            catch (Throwable t) {
                log.error("Failed delete Blob", t);
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public OffloadPoliciesImpl getOffloadPolicies() {
        Properties properties = new Properties();
        properties.putAll(this.config.getConfigProperties());
        return OffloadPoliciesImpl.create((Properties)properties);
    }

    public void close() {
        for (BlobStore readBlobStore : this.blobStores.values()) {
            if (readBlobStore == null) continue;
            readBlobStore.getContext().close();
        }
    }
}

