package org.apache.pulsar.broker.offload.impl;

import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.offload.OffloadIndexBlock;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.class */
public class S3ManagedLedgerOffloader implements LedgerOffloader {
    public static final String DRIVER_NAME = "S3";
    static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion";
    static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
    static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
    private final VersionCheck VERSION_CHECK = (str, objectMetadata) -> {
        String str = (String) objectMetadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY);
        if (str == null || !str.equals(CURRENT_VERSION)) {
            throw new IOException(String.format("Invalid object version %s for %s, expect %s", str, str, CURRENT_VERSION));
        }
    };
    private final OrderedScheduler scheduler;
    private final AmazonS3 s3client;
    private final String bucket;
    private int maxBlockSize;
    private final int readBufferSize;
    private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
    static final String CURRENT_VERSION = String.valueOf(1);

    /* loaded from: input_file:org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader$VersionCheck.class */
    public interface VersionCheck {
        void check(String str, ObjectMetadata objectMetadata) throws IOException;
    }

    public static S3ManagedLedgerOffloader create(ServiceConfiguration serviceConfiguration, OrderedScheduler orderedScheduler) throws PulsarServerException {
        String s3ManagedLedgerOffloadRegion = serviceConfiguration.getS3ManagedLedgerOffloadRegion();
        String s3ManagedLedgerOffloadBucket = serviceConfiguration.getS3ManagedLedgerOffloadBucket();
        String s3ManagedLedgerOffloadServiceEndpoint = serviceConfiguration.getS3ManagedLedgerOffloadServiceEndpoint();
        int s3ManagedLedgerOffloadMaxBlockSizeInBytes = serviceConfiguration.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
        int s3ManagedLedgerOffloadReadBufferSizeInBytes = serviceConfiguration.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
        if (Strings.isNullOrEmpty(s3ManagedLedgerOffloadRegion) && Strings.isNullOrEmpty(s3ManagedLedgerOffloadServiceEndpoint)) {
            throw new PulsarServerException("Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set if s3 offload enabled");
        }
        if (Strings.isNullOrEmpty(s3ManagedLedgerOffloadBucket)) {
            throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty if s3 offload enabled");
        }
        if (s3ManagedLedgerOffloadMaxBlockSizeInBytes < 5242880) {
            throw new PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB");
        }
        AmazonS3ClientBuilder standard = AmazonS3ClientBuilder.standard();
        if (Strings.isNullOrEmpty(s3ManagedLedgerOffloadServiceEndpoint)) {
            standard.setRegion(s3ManagedLedgerOffloadRegion);
        } else {
            standard.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3ManagedLedgerOffloadServiceEndpoint, s3ManagedLedgerOffloadRegion));
            standard.setPathStyleAccessEnabled(true);
        }
        return new S3ManagedLedgerOffloader((AmazonS3) standard.build(), s3ManagedLedgerOffloadBucket, orderedScheduler, s3ManagedLedgerOffloadMaxBlockSizeInBytes, s3ManagedLedgerOffloadReadBufferSizeInBytes);
    }

    S3ManagedLedgerOffloader(AmazonS3 amazonS3, String str, OrderedScheduler orderedScheduler, int i, int i2) {
        this.s3client = amazonS3;
        this.bucket = str;
        this.scheduler = orderedScheduler;
        this.maxBlockSize = i;
        this.readBufferSize = i2;
    }

    static String dataBlockOffloadKey(long j, UUID uuid) {
        return String.format("%s-ledger-%d", uuid.toString(), Long.valueOf(j));
    }

    static String indexBlockOffloadKey(long j, UUID uuid) {
        return String.format("%s-ledger-%d-index", uuid.toString(), Long.valueOf(j));
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> map) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.scheduler.chooseThread(readHandle.getId()).submit(() -> {
            Throwable th;
            Throwable th2;
            if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
                completableFuture.completeExceptionally(new IllegalArgumentException("An empty or open ledger should never be offloaded"));
                return;
            }
            OffloadIndexBlockBuilder withDataBlockHeaderLength = OffloadIndexBlockBuilder.create().withLedgerMetadata(readHandle.getLedgerMetadata()).withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
            String dataBlockOffloadKey = dataBlockOffloadKey(readHandle.getId(), uuid);
            String indexBlockOffloadKey = indexBlockOffloadKey(readHandle.getId(), uuid);
            ObjectMetadata objectMetadata = new ObjectMetadata();
            addVersionInfo(objectMetadata);
            try {
                InitiateMultipartUploadResult initiateMultipartUpload = this.s3client.initiateMultipartUpload(new InitiateMultipartUploadRequest(this.bucket, dataBlockOffloadKey, objectMetadata));
                long j = 0;
                try {
                    long j2 = 0;
                    int i = 1;
                    long j3 = 0;
                    LinkedList linkedList = new LinkedList();
                    while (true) {
                        if (j2 > readHandle.getLastAddConfirmed()) {
                            break;
                        }
                        int calculateBlockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize(this.maxBlockSize, readHandle, j2, j3);
                        th = null;
                        try {
                            BlockAwareSegmentInputStreamImpl blockAwareSegmentInputStreamImpl = new BlockAwareSegmentInputStreamImpl(readHandle, j2, calculateBlockSize);
                            try {
                                linkedList.add(this.s3client.uploadPart(new UploadPartRequest().withBucketName(this.bucket).withKey(dataBlockOffloadKey).withUploadId(initiateMultipartUpload.getUploadId()).withInputStream(blockAwareSegmentInputStreamImpl).withPartSize(calculateBlockSize).withPartNumber(i)).getPartETag());
                                withDataBlockHeaderLength.addBlock(j2, i, calculateBlockSize);
                                if (blockAwareSegmentInputStreamImpl.getEndEntryId() != -1) {
                                    j2 = blockAwareSegmentInputStreamImpl.getEndEntryId() + 1;
                                    j3 += blockAwareSegmentInputStreamImpl.getBlockEntryBytesCount();
                                    i++;
                                    if (blockAwareSegmentInputStreamImpl != null) {
                                        blockAwareSegmentInputStreamImpl.close();
                                    }
                                    j += calculateBlockSize;
                                }
                            } finally {
                                if (blockAwareSegmentInputStreamImpl != null) {
                                    blockAwareSegmentInputStreamImpl.close();
                                }
                            }
                        } finally {
                        }
                    }
                    this.s3client.completeMultipartUpload(new CompleteMultipartUploadRequest().withBucketName(this.bucket).withKey(dataBlockOffloadKey).withUploadId(initiateMultipartUpload.getUploadId()).withPartETags(linkedList));
                    th = null;
                    try {
                        try {
                            OffloadIndexBlock build = withDataBlockHeaderLength.withDataObjectLength(j).build();
                            try {
                                OffloadIndexBlock.IndexInputStream stream = build.toStream();
                                try {
                                    ObjectMetadata objectMetadata2 = new ObjectMetadata();
                                    objectMetadata2.setContentLength(stream.getStreamSize());
                                    addVersionInfo(objectMetadata2);
                                    this.s3client.putObject(new PutObjectRequest(this.bucket, indexBlockOffloadKey, stream, objectMetadata2));
                                    completableFuture.complete(null);
                                    if (stream != null) {
                                        stream.close();
                                    }
                                    if (build != null) {
                                        build.close();
                                    }
                                } catch (Throwable th3) {
                                    if (stream != null) {
                                        stream.close();
                                    }
                                    throw th3;
                                }
                            } catch (Throwable th4) {
                                if (0 == 0) {
                                    th = th4;
                                } else if (null != th4) {
                                    th.addSuppressed(th4);
                                }
                                if (build != null) {
                                    build.close();
                                }
                                throw th;
                            }
                        } finally {
                        }
                    } catch (Throwable th5) {
                        try {
                            this.s3client.deleteObject(this.bucket, dataBlockOffloadKey);
                        } catch (Throwable th6) {
                            log.error("Failed deleteObject in bucket - {} with key - {}.", new Object[]{this.bucket, dataBlockOffloadKey, th6});
                        }
                        completableFuture.completeExceptionally(th5);
                    }
                } catch (Throwable th7) {
                    try {
                        this.s3client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, dataBlockOffloadKey, initiateMultipartUpload.getUploadId()));
                    } catch (Throwable th8) {
                        log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", new Object[]{this.bucket, dataBlockOffloadKey, initiateMultipartUpload.getUploadId(), th8});
                    }
                    completableFuture.completeExceptionally(th7);
                }
            } catch (Throwable th9) {
                completableFuture.completeExceptionally(th9);
            }
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<ReadHandle> readOffloaded(long j, UUID uuid) {
        CompletableFuture<ReadHandle> completableFuture = new CompletableFuture<>();
        String dataBlockOffloadKey = dataBlockOffloadKey(j, uuid);
        String indexBlockOffloadKey = indexBlockOffloadKey(j, uuid);
        this.scheduler.chooseThread(j).submit(() -> {
            try {
                completableFuture.complete(S3BackedReadHandleImpl.open(this.scheduler.chooseThread(j), this.s3client, this.bucket, dataBlockOffloadKey, indexBlockOffloadKey, this.VERSION_CHECK, j, this.readBufferSize));
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    private static void addVersionInfo(ObjectMetadata objectMetadata) {
        objectMetadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION);
        objectMetadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY, PulsarBrokerVersionStringUtils.getNormalizedVersionString());
        objectMetadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha());
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<Void> deleteOffloaded(long j, UUID uuid) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.scheduler.chooseThread(j).submit(() -> {
            try {
                this.s3client.deleteObjects(new DeleteObjectsRequest(this.bucket).withKeys(new String[]{dataBlockOffloadKey(j, uuid), indexBlockOffloadKey(j, uuid)}));
                completableFuture.complete(null);
            } catch (Throwable th) {
                log.error("Failed delete s3 Object ", th);
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }
}
