/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlockAwareSegmentInputStreamImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.MultipartUpload;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.InputStreamPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobStoreManagedLedgerOffloader
implements LedgerOffloader {
    private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
    private final OrderedScheduler scheduler;
    private final TieredStorageConfiguration config;
    private final Location writeLocation;
    private final Map<String, String> userMetadata;
    private final ConcurrentMap<BlobStoreLocation, BlobStore> blobStores = new ConcurrentHashMap<BlobStoreLocation, BlobStore>();

    public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config, Map<String, String> userMetadata, OrderedScheduler scheduler) throws IOException {
        return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata);
    }

    BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler, Map<String, String> userMetadata) {
        this.scheduler = scheduler;
        this.userMetadata = userMetadata;
        this.config = config;
        this.writeLocation = !Strings.isNullOrEmpty((String)config.getRegion()) ? new LocationBuilder().scope(LocationScope.REGION).id(config.getRegion()).description(config.getRegion()).build() : null;
        log.info("Constructor offload driver: {}, host: {}, container: {}, region: {} ", new Object[]{config.getProvider().getDriver(), config.getServiceEndpoint(), config.getBucket(), config.getRegion()});
        this.blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore());
        log.info("The ledger offloader was created.");
    }

    public String getOffloadDriverName() {
        return this.config.getDriver();
    }

    public Map<String, String> getOffloadDriverMetadata() {
        return this.config.getOffloadDriverMetadata();
    }

    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
        BlobStore writeBlobStore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.chooseThread(readHandle.getId()).submit(() -> {
            if (readHandle.getLength() == 0L || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0L) {
                promise.completeExceptionally(new IllegalArgumentException("An empty or open ledger should never be offloaded"));
                return;
            }
            OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create().withLedgerMetadata(readHandle.getLedgerMetadata()).withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
            String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid);
            String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid);
            MultipartUpload mpu = null;
            ArrayList parts = Lists.newArrayList();
            try {
                BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey);
                DataBlockUtils.addVersionInfo(blobBuilder, this.userMetadata);
                Blob blob = blobBuilder.build();
                mpu = writeBlobStore.initiateMultipartUpload(this.config.getBucket(), (BlobMetadata)blob.getMetadata(), new PutOptions());
            }
            catch (Throwable t) {
                promise.completeExceptionally(t);
                return;
            }
            long dataObjectLength = 0L;
            try {
                long startEntry = 0L;
                int partId = 1;
                long entryBytesWritten = 0L;
                while (startEntry <= readHandle.getLastAddConfirmed()) {
                    int blockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize(this.config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten);
                    try (BlockAwareSegmentInputStreamImpl blockStream = new BlockAwareSegmentInputStreamImpl(readHandle, startEntry, blockSize);){
                        InputStreamPayload partPayload = Payloads.newInputStreamPayload((InputStream)blockStream);
                        partPayload.getContentMetadata().setContentLength(Long.valueOf(blockSize));
                        partPayload.getContentMetadata().setContentType("application/octet-stream");
                        parts.add(writeBlobStore.uploadMultipartPart(mpu, partId, (Payload)partPayload));
                        log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", new Object[]{this.config.getBucket(), dataBlockKey, partId, mpu.id()});
                        indexBuilder.addBlock(startEntry, partId, blockSize);
                        if (((BlockAwareSegmentInputStream)blockStream).getEndEntryId() == -1L) break;
                        startEntry = ((BlockAwareSegmentInputStream)blockStream).getEndEntryId() + 1L;
                        entryBytesWritten += (long)((BlockAwareSegmentInputStream)blockStream).getBlockEntryBytesCount();
                        ++partId;
                    }
                    dataObjectLength += (long)blockSize;
                }
                writeBlobStore.completeMultipartUpload(mpu, (List)parts);
                mpu = null;
            }
            catch (Throwable t) {
                try {
                    if (mpu != null) {
                        writeBlobStore.abortMultipartUpload(mpu);
                    }
                }
                catch (Throwable throwable) {
                    log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", new Object[]{this.config.getBucket(), dataBlockKey, mpu.id(), throwable});
                }
                promise.completeExceptionally(t);
                return;
            }
            try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
                 OffloadIndexBlock.IndexInputStream indexStream = index.toStream();){
                BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey);
                DataBlockUtils.addVersionInfo(blobBuilder, this.userMetadata);
                InputStreamPayload indexPayload = Payloads.newInputStreamPayload((InputStream)indexStream);
                indexPayload.getContentMetadata().setContentLength(Long.valueOf(indexStream.getStreamSize()));
                indexPayload.getContentMetadata().setContentType("application/octet-stream");
                Blob blob = blobBuilder.payload((Payload)indexPayload).contentLength(indexStream.getStreamSize()).build();
                writeBlobStore.putBlob(this.config.getBucket(), blob);
                promise.complete(null);
            }
            catch (Throwable t) {
                try {
                    writeBlobStore.removeBlob(this.config.getBucket(), dataBlockKey);
                }
                catch (Throwable throwable) {
                    log.error("Failed deleteObject in bucket - {} with key - {}.", new Object[]{this.config.getBucket(), dataBlockKey, throwable});
                }
                promise.completeExceptionally(t);
                return;
            }
        });
        return promise;
    }

    private BlobStoreLocation getBlobStoreLocation(Map<String, String> offloadDriverMetadata) {
        return !offloadDriverMetadata.isEmpty() ? new BlobStoreLocation(offloadDriverMetadata) : new BlobStoreLocation(this.getOffloadDriverMetadata());
    }

    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
        BlobStoreLocation bsKey = this.getBlobStoreLocation(offloadDriverMetadata);
        String readBucket = bsKey.getBucket();
        BlobStore readBlobstore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
        String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
        String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
        this.scheduler.chooseThread(ledgerId).submit(() -> {
            try {
                promise.complete(BlobStoreBackedReadHandleImpl.open((ScheduledExecutorService)this.scheduler.chooseThread(ledgerId), readBlobstore, readBucket, key, indexKey, DataBlockUtils.VERSION_CHECK, ledgerId, this.config.getReadBufferSizeInBytes()));
            }
            catch (Throwable t) {
                log.error("Failed readOffloaded: ", t);
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
        BlobStoreLocation bsKey = this.getBlobStoreLocation(offloadDriverMetadata);
        String readBucket = bsKey.getBucket(offloadDriverMetadata);
        BlobStore readBlobstore = (BlobStore)this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.scheduler.chooseThread(ledgerId).submit(() -> {
            try {
                readBlobstore.removeBlobs(readBucket, (Iterable)ImmutableList.of((Object)DataBlockUtils.dataBlockOffloadKey(ledgerId, uid), (Object)DataBlockUtils.indexBlockOffloadKey(ledgerId, uid)));
                promise.complete(null);
            }
            catch (Throwable t) {
                log.error("Failed delete Blob", t);
                promise.completeExceptionally(t);
            }
        });
        return promise;
    }

    public OffloadPolicies getOffloadPolicies() {
        Properties properties = new Properties();
        properties.putAll(this.config.getConfigProperties());
        return OffloadPolicies.create((Properties)properties);
    }

    public void close() {
        for (BlobStore readBlobStore : this.blobStores.values()) {
            if (readBlobStore == null) continue;
            readBlobStore.getContext().close();
        }
    }
}

