package kafka.tier.store;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.polling.LongRunningOperationStatus;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.blob.models.DownloadRetryOptions;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobBeginCopyOptions;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.AzureBlockBlobTierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.metadata.HealthMetadata;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.utils.CoreUtils;
import kafka.utils.Throttler;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* loaded from: input_file:kafka/tier/store/AzureBlockBlobTierObjectStore.class */
public class AzureBlockBlobTierObjectStore extends TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger(AzureBlockBlobTierObjectStore.class);
    private final BlobServiceClient blobServiceClient;
    private final BlobContainerClient blobContainerClient;
    private final BlobServiceAsyncClient blobServiceAsyncClient;
    public final BlobContainerAsyncClient blobContainerAsyncClient;
    private final Optional<String> clusterIdOpt;
    private final Optional<Integer> brokerIdOpt;
    private final String container;
    private final String prefix;
    private final int drainThreshold;
    private static final long DEFAULT_TIMEOUT_IN_SECS = 30;

    /* loaded from: input_file:kafka/tier/store/AzureBlockBlobTierObjectStore$AzureBlockBlobTierObjectStoreResponse.class */
    private static class AzureBlockBlobTierObjectStoreResponse implements TierObjectStoreResponse {
        private final InputStream inputStream;

        AzureBlockBlobTierObjectStoreResponse(InputStream inputStream, int i, long j) {
            this.inputStream = new AzureBlockBlobAutoAbortingInputStream(inputStream, i, j);
        }

        @Override // kafka.tier.store.TierObjectStoreResponse, java.lang.AutoCloseable
        public void close() throws IOException {
            this.inputStream.close();
        }

        @Override // kafka.tier.store.TierObjectStoreResponse
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }

    public AzureBlockBlobTierObjectStore(AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        this.clusterIdOpt = azureBlockBlobTierObjectStoreConfig.clusterIdOpt;
        this.brokerIdOpt = azureBlockBlobTierObjectStoreConfig.brokerIdOpt;
        this.container = azureBlockBlobTierObjectStoreConfig.container;
        this.prefix = azureBlockBlobTierObjectStoreConfig.azureBlobPrefix;
        this.drainThreshold = azureBlockBlobTierObjectStoreConfig.drainThreshold;
        this.blobServiceClient = createServiceClient(azureBlockBlobTierObjectStoreConfig);
        this.blobContainerClient = createContainerClient(this.blobServiceClient, azureBlockBlobTierObjectStoreConfig);
        this.blobServiceAsyncClient = createServiceAsyncClient(azureBlockBlobTierObjectStoreConfig);
        this.blobContainerAsyncClient = createContainerAsyncClient(this.blobServiceAsyncClient, azureBlockBlobTierObjectStoreConfig);
    }

    public AzureBlockBlobTierObjectStore(AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig, BlobServiceClient blobServiceClient, BlobContainerClient blobContainerClient, BlobServiceAsyncClient blobServiceAsyncClient, BlobContainerAsyncClient blobContainerAsyncClient) {
        this.clusterIdOpt = azureBlockBlobTierObjectStoreConfig.clusterIdOpt;
        this.brokerIdOpt = azureBlockBlobTierObjectStoreConfig.brokerIdOpt;
        this.container = azureBlockBlobTierObjectStoreConfig.container;
        this.prefix = azureBlockBlobTierObjectStoreConfig.azureBlobPrefix;
        this.drainThreshold = azureBlockBlobTierObjectStoreConfig.drainThreshold;
        this.blobServiceClient = blobServiceClient;
        this.blobContainerClient = blobContainerClient;
        this.blobServiceAsyncClient = blobServiceAsyncClient;
        this.blobContainerAsyncClient = blobContainerAsyncClient;
    }

    @Override // kafka.tier.store.TierObjectStore
    public String keyPrefix() {
        return this.prefix;
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStore.Backend getBackend() {
        return TierObjectStore.Backend.AzureBlockBlob;
    }

    @Override // kafka.tier.store.TierObjectStore
    public Map<String, List<VersionInformation>> listObject(String str, boolean z) {
        HashMap hashMap = new HashMap();
        try {
            Iterator it = this.blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(str).setDetails(new BlobListDetails().setRetrieveVersions(z)), Duration.ofSeconds(DEFAULT_TIMEOUT_IN_SECS)).iterator();
            while (it.hasNext()) {
                BlobItem blobItem = (BlobItem) it.next();
                hashMap.putIfAbsent(blobItem.getName(), new ArrayList());
                if (z) {
                    ((List) hashMap.get(blobItem.getName())).add(new VersionInformation(blobItem.getVersionId()));
                }
            }
            if (log.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder();
                hashMap.forEach((str2, list) -> {
                    sb.append("[").append(str2).append("->").append(Arrays.toString(list.toArray())).append("] ");
                });
                log.debug("TierObjectStore listObjects versions: " + z + " prefix: " + str + " " + ((Object) sb));
            }
            return hashMap;
        } catch (RuntimeException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to list objects with keyPrefix: %s, getVersionInfo: %b", str, Boolean.valueOf(z)), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Failed to list objects with keyPrefix: %s, getVersionInfo: %b", str, Boolean.valueOf(z)), e2);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:62:0x00ff, code lost:
    
        throw new kafka.tier.exceptions.TierObjectStoreRetriableException("Encountered an exception when fetching snapshot from object store.", r10);
     */
    @Override // kafka.tier.store.TierObjectStore
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.nio.ByteBuffer getSnapshot(kafka.tier.store.objects.metadata.ObjectStoreMetadata r6, kafka.tier.store.objects.FragmentType r7, int r8) {
        /*
            Method dump skipped, instructions count: 259
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.tier.store.AzureBlockBlobTierObjectStore.getSnapshot(kafka.tier.store.objects.metadata.ObjectStoreMetadata, kafka.tier.store.objects.FragmentType, int):java.nio.ByteBuffer");
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStoreResponse getObject(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, Long l, Long l2, VersionInformation versionInformation) {
        String keyPath = keyPath(objectStoreMetadata, objectType);
        BlobClient blobVersionClient = versionInformation != null ? this.blobContainerClient.getBlobVersionClient(keyPath, versionInformation.getVersionId()) : this.blobContainerClient.getBlobClient(keyPath);
        checkOffsets(l, l2);
        log.debug("Fetching object from {}/{}, with range of {} to {}", new Object[]{this.container, keyPath, l, l2});
        long longValue = l != null ? l.longValue() : 0L;
        BlobRange blobRange = l2 != null ? new BlobRange(longValue, Long.valueOf(l2.longValue() - longValue)) : new BlobRange(longValue);
        try {
            InputStream inputStreamFromBlobClient = getInputStreamFromBlobClient(blobVersionClient, blobRange);
            Long count = blobRange.getCount();
            return new AzureBlockBlobTierObjectStoreResponse(inputStreamFromBlobClient, this.drainThreshold, count == null ? TierObjectMetadata.DEFAULT_STATE_CHANGE_TIMESTAMP : count.longValue());
        } catch (Exception e) {
            throw convertFetchException(e, keyPath, objectStoreMetadata, objectType, l, l2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    protected CompletableFuture<TierObjectStoreResponse> getObjectAsync(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, Long l, Long l2, VersionInformation versionInformation) {
        CompletableFuture<TierObjectStoreResponse> completableFuture = new CompletableFuture<>();
        try {
            String keyPath = keyPath(objectStoreMetadata, objectType);
            BlobAsyncClient blobVersionAsyncClient = versionInformation != null ? this.blobContainerAsyncClient.getBlobVersionAsyncClient(keyPath, versionInformation.getVersionId()) : this.blobContainerAsyncClient.getBlobAsyncClient(keyPath);
            checkOffsets(l, l2);
            log.debug("Fetching object async from {}/{}, with range of {} to {}", new Object[]{this.container, keyPath, l, l2});
            long longValue = l == null ? 0L : l.longValue();
            BlobRange blobRange = l2 != null ? new BlobRange(longValue, Long.valueOf(l2.longValue() - longValue)) : new BlobRange(longValue);
            Long count = blobRange.getCount();
            blobVersionAsyncClient.getBlockBlobAsyncClient().downloadStreamWithResponse(blobRange, (DownloadRetryOptions) null, new BlobRequestConditions(), false).flatMap(blobDownloadAsyncResponse -> {
                return count == null ? FluxUtil.collectBytesInByteBufferStream((Flux) blobDownloadAsyncResponse.getValue()) : FluxUtil.collectBytesInByteBufferStream((Flux) blobDownloadAsyncResponse.getValue(), count.intValue());
            }).subscribe(bArr -> {
                completableFuture.complete(new AzureBlockBlobTierObjectStoreResponse(new ByteArrayInputStream(bArr), this.drainThreshold, bArr.length));
            }, th -> {
                completableFuture.completeExceptionally(convertFetchException(th, keyPath, objectStoreMetadata, objectType, l, l2));
            });
        } catch (Exception e) {
            log.error("Failed to send async fetch request, metadata: {}, type: {}", new Object[]{objectStoreMetadata, objectType, e});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private RuntimeException convertFetchException(Throwable th, String str, ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, Long l, Long l2) {
        return th instanceof BlobStorageException ? BlobErrorCode.BLOB_NOT_FOUND.equals(((BlobStorageException) th).getErrorCode()) ? new TierObjectStoreFatalException(String.format("Failed to fetch object from %s, metadata: %s type: %s range %s-%s.Object not found.", str, objectStoreMetadata, objectType, l, l2), th) : new TierObjectStoreRetriableException(String.format("Failed to fetch object from %s, metadata: %s type: %s range %s-%s", str, objectStoreMetadata, objectType, l, l2), th) : th instanceof RuntimeException ? new TierObjectStoreRetriableException(String.format("Runtime exception when fetching object from %s, metadata: %s type: %s range %s-%s", str, objectStoreMetadata, objectType, l, l2), th) : new TierObjectStoreFatalException(String.format("Unknown exception when fetching object from %s, metadata: %s type: %s range %s-%s", str, objectStoreMetadata, objectType, l, l2), th);
    }

    private static void checkOffsets(Long l, Long l2) {
        if (l != null && l2 != null && l.longValue() > l2.longValue()) {
            throw new IllegalStateException(String.format("Invalid range of byteOffsetStart=%d and byteOffsetEndExclusive=%d", l, l2));
        }
        if (l == null && l2 != null) {
            throw new IllegalStateException(String.format("Cannot specify a byteOffsetEndExclusive=%d without specifying a byteOffsetStart", l2));
        }
    }

    InputStream getInputStreamFromBlobClient(BlobClient blobClient, BlobRange blobRange) {
        return blobClient.getBlockBlobClient().openInputStream(blobRange, new BlobRequestConditions());
    }

    @Override // kafka.tier.store.TierObjectStore
    public OpaqueData prepPutSegment() throws TierObjectStoreRetriableException, IOException {
        return OpaqueData.ZEROED;
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putSegment(ObjectMetadata objectMetadata, File file, File file2, File file3, Optional<File> optional, Optional<ByteBuffer> optional2, Optional<ByteBuffer> optional3, Optional<Throttler> optional4) {
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            if (optional4.isPresent()) {
                putFileWithThrottling(keyPath(objectMetadata, ObjectType.SEGMENT), objectMetadata2, file, optional4.get());
                putFileWithThrottling(keyPath(objectMetadata, ObjectType.OFFSET_INDEX), objectMetadata2, file2, optional4.get());
                putFileWithThrottling(keyPath(objectMetadata, ObjectType.TIMESTAMP_INDEX), objectMetadata2, file3, optional4.get());
                if (optional.isPresent()) {
                    putFileWithThrottling(keyPath(objectMetadata, ObjectType.PRODUCER_STATE), objectMetadata2, optional.get(), optional4.get());
                }
            } else {
                putFile(keyPath(objectMetadata, ObjectType.SEGMENT), objectMetadata2, file);
                putFile(keyPath(objectMetadata, ObjectType.OFFSET_INDEX), objectMetadata2, file2);
                putFile(keyPath(objectMetadata, ObjectType.TIMESTAMP_INDEX), objectMetadata2, file3);
                optional.ifPresent(file4 -> {
                    putFile(keyPath(objectMetadata, ObjectType.PRODUCER_STATE), objectMetadata2, file4);
                });
            }
            if (optional2.isPresent()) {
                putBuf(keyPath(objectMetadata, ObjectType.TRANSACTION_INDEX), objectMetadata2, optional2.get());
            }
            if (optional3.isPresent()) {
                putBuf(keyPath(objectMetadata, ObjectType.EPOCH_STATE), objectMetadata2, optional3.get());
            }
        } catch (RuntimeException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putInMemorySegment(ObjectMetadata objectMetadata, File file, File file2, File file3, Optional<ByteBuffer> optional, Optional<ByteBuffer> optional2, Optional<ByteBuffer> optional3) {
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            putFile(keyPath(objectMetadata, ObjectType.SEGMENT), objectMetadata2, file);
            putFile(keyPath(objectMetadata, ObjectType.OFFSET_INDEX), objectMetadata2, file2);
            putFile(keyPath(objectMetadata, ObjectType.TIMESTAMP_INDEX), objectMetadata2, file3);
            if (optional.isPresent()) {
                putBuf(keyPath(objectMetadata, ObjectType.PRODUCER_STATE), objectMetadata2, optional.get());
            }
            if (optional2.isPresent()) {
                putBuf(keyPath(objectMetadata, ObjectType.TRANSACTION_INDEX), objectMetadata2, optional2.get());
            }
            if (optional3.isPresent()) {
                putBuf(keyPath(objectMetadata, ObjectType.EPOCH_STATE), objectMetadata2, optional3.get());
            }
        } catch (RuntimeException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public String putObject(ObjectStoreMetadata objectStoreMetadata, File file, ObjectType objectType) {
        Map<String, String> objectMetadata = objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            String keyPath = keyPath(objectStoreMetadata, objectType);
            putFile(keyPath, objectMetadata, file);
            return keyPath;
        } catch (RuntimeException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, objectType), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, objectType), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public String putBuffer(ObjectStoreMetadata objectStoreMetadata, ByteBuffer byteBuffer, ObjectType objectType) {
        Map<String, String> objectMetadata = objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            String keyPath = keyPath(objectStoreMetadata, objectType);
            putBuf(keyPath, objectMetadata, byteBuffer);
            return keyPath;
        } catch (RuntimeException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object %s, buffer %s, type %s", objectStoreMetadata, byteBuffer, objectType), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object %s, buffer %s, type %s", objectStoreMetadata, byteBuffer, objectType), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void restoreObjectByCopy(ObjectMetadata objectMetadata, String str, VersionInformation versionInformation) {
        String versionId = versionInformation.getVersionId();
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            String blobUrl = this.blobContainerClient.getBlobVersionClient(str, versionId).getBlobUrl();
            log.debug(String.format("Azure restore key: %s lastLiveVersionId: %s copySource: %s", str, versionId, blobUrl));
            log.debug(String.format("Azure restore key: %s response status: %s", str, this.blobContainerClient.getBlobClient(str).beginCopy(new BlobBeginCopyOptions(blobUrl).setMetadata(objectMetadata2).setTier(AccessTier.HOT)).waitUntil(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED).getStatus()));
        } catch (RuntimeException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to restore object %s (version: %s)", str, versionId), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Unknown exception when restoring object %s (version: %s)", str, versionId), e2);
        } catch (BlobStorageException e3) {
            if (!BlobErrorCode.BLOB_NOT_FOUND.equals(e3.getErrorCode())) {
                throw new TierObjectStoreRetriableException(String.format("Failed to restore object %s (version: %s)", str, versionId), e3);
            }
            throw new TierObjectStoreFatalException(String.format("Failed to restore object %s (version: %s) as blob not found", str, versionId), e3);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteSegment(ObjectMetadata objectMetadata) {
        for (String str : keysForSegment(objectMetadata)) {
            try {
                BlobClient blobClient = this.blobContainerClient.getBlobClient(str);
                log.debug("Deleting " + str);
                blobClient.delete();
            } catch (RuntimeException e) {
                throw new TierObjectStoreRetriableException("Failed to delete file " + str, e);
            } catch (Exception e2) {
                throw new TierObjectStoreFatalException("Unknown exception when deleting segment " + objectMetadata, e2);
            } catch (BlobStorageException e3) {
                if (!BlobErrorCode.BLOB_NOT_FOUND.equals(e3.getErrorCode())) {
                    throw new TierObjectStoreRetriableException("Failed to delete file " + str, e3);
                }
            }
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteVersions(List<TierObjectStore.KeyAndVersion> list) {
        Duration ofSeconds = Duration.ofSeconds(DEFAULT_TIMEOUT_IN_SECS);
        for (TierObjectStore.KeyAndVersion keyAndVersion : list) {
            try {
                log.debug("TierObjectStore delete request for " + keyAndVersion + " statusCode: " + (keyAndVersion.versionId() == null ? this.blobContainerClient.getBlobClient(keyAndVersion.key()) : this.blobContainerClient.getBlobVersionClient(keyAndVersion.key(), keyAndVersion.versionId())).deleteWithResponse((DeleteSnapshotsOptionType) null, (BlobRequestConditions) null, ofSeconds, Context.NONE).getStatusCode());
            } catch (RuntimeException e) {
                throw new TierObjectStoreRetriableException("Failed to delete versioned objects: " + keyAndVersion, e);
            } catch (Exception e2) {
                throw new TierObjectStoreFatalException("Unknown exception when deleting versioned objects: " + keyAndVersion, e2);
            }
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectAttribute objectExists(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType) throws IOException, TierObjectStoreRetriableException {
        String keyPath = keyPath(objectStoreMetadata, objectType);
        TierObjectAttribute tierObjectAttribute = new TierObjectAttribute(false);
        try {
            BlobClient blobClient = this.blobContainerClient.getBlobClient(keyPath);
            if (blobClient.exists().booleanValue()) {
                tierObjectAttribute.size = blobClient.getProperties().getBlobSize();
                tierObjectAttribute.exist = true;
                log.trace("objectExists at {}/{} with size {}", new Object[]{this.container, keyPath, Long.valueOf(tierObjectAttribute.size)});
            }
            return tierObjectAttribute;
        } catch (RuntimeException e) {
            throw new TierObjectStoreRetriableException("Failed to check for object attributes with metadata " + objectStoreMetadata + " @ " + keyPath, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception while checking for object attributes with metadata " + objectStoreMetadata + " @ " + keyPath, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public BucketHealthResult checkBucketHealth() {
        try {
            ByteBuffer timeHealthPayload = TierObjectStoreUtils.timeHealthPayload();
            HealthMetadata healthMetadata = new HealthMetadata(this.clusterIdOpt, this.brokerIdOpt);
            String objectPath = healthMetadata.toFragmentLocation(this.prefix, FragmentType.HEALTH_CHECK).get().objectPath();
            putBuf(objectPath, healthMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt), timeHealthPayload);
            InputStream inputStream = getObjectStoreFragment(healthMetadata, FragmentType.HEALTH_CHECK).getInputStream();
            Throwable th = null;
            while (true) {
                try {
                    try {
                        int read = inputStream.read();
                        if (read <= 0) {
                            break;
                        }
                        log.trace("Bucket probe read {} bytes", Integer.valueOf(read));
                    } finally {
                    }
                } finally {
                }
            }
            if (inputStream != null) {
                if (0 != 0) {
                    try {
                        inputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    inputStream.close();
                }
            }
            BlobClient blobClient = this.blobContainerClient.getBlobClient(objectPath);
            log.debug("Deleting " + objectPath);
            blobClient.delete();
            return BucketHealthResult.HEALTHY;
        } catch (Exception e) {
            log.error("Bucket health checker returned unclassified error", e);
            return BucketHealthResult.UNCLASSIFIED;
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void close() {
    }

    private void putFile(String str, Map<String, String> map, File file) {
        this.blobContainerClient.getBlobClient(str).uploadFromFile(file.getPath(), new ParallelTransferOptions(), new BlobHttpHeaders(), map, AccessTier.HOT, new BlobRequestConditions(), (Duration) null);
        log.debug("Uploaded file to {}/{}", this.container, str);
    }

    private void putFileWithThrottling(String str, Map<String, String> map, File file, Throttler throttler) throws IOException {
        BlobClient blobClient = this.blobContainerClient.getBlobClient(str);
        BufferedInputStream bufferedInputStream = new BufferedInputStream(new ThrottledFileInputStream(file, throttler));
        Throwable th = null;
        try {
            try {
                blobClient.getBlockBlobClient().uploadWithResponse(bufferedInputStream, file.length(), new BlobHttpHeaders(), map, AccessTier.HOT, (byte[]) null, new BlobRequestConditions(), (Duration) null, (Context) null);
                if (bufferedInputStream != null) {
                    if (0 != 0) {
                        try {
                            bufferedInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedInputStream.close();
                    }
                }
                log.debug("Uploaded file with throttling to {}/{}", this.container, str);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedInputStream != null) {
                if (th != null) {
                    try {
                        bufferedInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedInputStream.close();
                }
            }
            throw th3;
        }
    }

    public void putBuf(String str, Map<String, String> map, ByteBuffer byteBuffer) throws IOException {
        BlobClient blobClient = this.blobContainerClient.getBlobClient(str);
        byte[] md5hash = CoreUtils.md5hash(byteBuffer);
        BufferedInputStream bufferedInputStream = new BufferedInputStream(new ByteBufferInputStream(byteBuffer.duplicate()));
        Throwable th = null;
        try {
            try {
                blobClient.getBlockBlobClient().uploadWithResponse(bufferedInputStream, byteBuffer.limit() - byteBuffer.position(), new BlobHttpHeaders().setContentMd5(md5hash), map, AccessTier.HOT, md5hash, new BlobRequestConditions(), (Duration) null, (Context) null);
                if (bufferedInputStream != null) {
                    if (0 != 0) {
                        try {
                            bufferedInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedInputStream.close();
                    }
                }
                log.debug("Uploaded buffer to {}/{}", this.container, str);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedInputStream != null) {
                if (th != null) {
                    try {
                        bufferedInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedInputStream.close();
                }
            }
            throw th3;
        }
    }

    private List<String> keysForSegment(ObjectMetadata objectMetadata) {
        ArrayList arrayList = new ArrayList();
        for (ObjectType objectType : TierObjectStore.getObjectTypesPerSegment()) {
            switch (objectType) {
                case TRANSACTION_INDEX:
                    if (objectMetadata.hasAbortedTxns()) {
                        arrayList.add(keyPath(objectMetadata, objectType));
                        break;
                    } else {
                        break;
                    }
                case EPOCH_STATE:
                    if (objectMetadata.hasEpochState()) {
                        arrayList.add(keyPath(objectMetadata, objectType));
                        break;
                    } else {
                        break;
                    }
                case PRODUCER_STATE:
                    if (objectMetadata.hasProducerState()) {
                        arrayList.add(keyPath(objectMetadata, objectType));
                        break;
                    } else {
                        break;
                    }
                default:
                    arrayList.add(keyPath(objectMetadata, objectType));
                    break;
            }
        }
        return arrayList;
    }

    private static BlobServiceClientBuilder createServiceClientBuilder(AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        BlobServiceClientBuilder credential;
        if (azureBlockBlobTierObjectStoreConfig.azureCredentialsConfig.isPresent()) {
            AzureBlockBlobTierObjectStoreConfig.AzureCredentialsConfig azureCredentialsConfig = azureBlockBlobTierObjectStoreConfig.azureCredentialsConfig.get();
            if (azureCredentialsConfig.connectionStringAuthMethod().booleanValue()) {
                credential = new BlobServiceClientBuilder().connectionString(azureCredentialsConfig.connectionString());
            } else {
                credential = new BlobServiceClientBuilder().endpoint(azureBlockBlobTierObjectStoreConfig.endpoint.get()).credential(new ClientSecretCredentialBuilder().clientId(azureCredentialsConfig.azureClientId()).tenantId(azureCredentialsConfig.azureTenantId()).clientSecret(azureCredentialsConfig.azureClientSecret()).build());
            }
        } else {
            credential = new BlobServiceClientBuilder().endpoint(azureBlockBlobTierObjectStoreConfig.endpoint.get()).credential(new DefaultAzureCredentialBuilder().build());
        }
        return credential;
    }

    private static BlobServiceClient createServiceClient(AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        return createServiceClientBuilder(azureBlockBlobTierObjectStoreConfig).buildClient();
    }

    private static BlobServiceAsyncClient createServiceAsyncClient(AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        return createServiceClientBuilder(azureBlockBlobTierObjectStoreConfig).buildAsyncClient();
    }

    private static BlobContainerClient createContainerClient(BlobServiceClient blobServiceClient, AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(azureBlockBlobTierObjectStoreConfig.container);
        if (blobContainerClient.exists()) {
            return blobContainerClient;
        }
        throw new TierObjectStoreFatalException("Container " + azureBlockBlobTierObjectStoreConfig.container + " does not exist or could not be found");
    }

    private static BlobContainerAsyncClient createContainerAsyncClient(BlobServiceAsyncClient blobServiceAsyncClient, AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        BlobContainerAsyncClient blobContainerAsyncClient = blobServiceAsyncClient.getBlobContainerAsyncClient(azureBlockBlobTierObjectStoreConfig.container);
        if (((Boolean) blobContainerAsyncClient.exists().blockOptional().orElse(false)).booleanValue()) {
            return blobContainerAsyncClient;
        }
        throw new TierObjectStoreFatalException("Container " + azureBlockBlobTierObjectStoreConfig.container + " does not exist or could not be found");
    }

    private String keyPath(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType) {
        return objectStoreMetadata.toFragmentLocation(this.prefix, objectType.getDesignatedFragmentType().get()).get().objectPath();
    }
}
