package kafka.tier.store;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.TierObjectStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/store/GcsTierObjectStore.class */
public class GcsTierObjectStore implements TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger(GcsTierObjectStore.class);
    private static final int UNKNOWN_END_RANGE_CHUNK_SIZE = 1000000;
    private final Optional<String> clusterIdOpt;
    private final Optional<Integer> brokerIdOpt;
    private final String bucket;
    private final String prefix;
    private final int writeChunkSize;
    private final Storage storage;

    /* loaded from: input_file:kafka/tier/store/GcsTierObjectStore$GcsTierObjectStoreResponse.class */
    private static class GcsTierObjectStoreResponse implements TierObjectStoreResponse {
        private final InputStream inputStream;

        GcsTierObjectStoreResponse(ReadChannel readChannel, long j, OptionalInt optionalInt) throws IOException {
            int orElse = optionalInt.orElse(GcsTierObjectStore.UNKNOWN_END_RANGE_CHUNK_SIZE);
            readChannel.seek(j);
            readChannel.setChunkSize(orElse);
            this.inputStream = Channels.newInputStream((ReadableByteChannel) readChannel);
        }

        @Override // kafka.tier.store.TierObjectStoreResponse, java.lang.AutoCloseable
        public void close() throws IOException {
            this.inputStream.close();
        }

        @Override // kafka.tier.store.TierObjectStoreResponse
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }

    public GcsTierObjectStore(GcsTierObjectStoreConfig gcsTierObjectStoreConfig) {
        this(storage(gcsTierObjectStoreConfig), gcsTierObjectStoreConfig);
    }

    GcsTierObjectStore(Storage storage, GcsTierObjectStoreConfig gcsTierObjectStoreConfig) {
        this.clusterIdOpt = gcsTierObjectStoreConfig.clusterIdOpt;
        this.brokerIdOpt = gcsTierObjectStoreConfig.brokerIdOpt;
        this.storage = storage;
        this.bucket = gcsTierObjectStoreConfig.gcsBucket;
        this.prefix = gcsTierObjectStoreConfig.gcsPrefix;
        this.writeChunkSize = gcsTierObjectStoreConfig.gcsWriteChunkSize.intValue();
        expectBucket(this.bucket, gcsTierObjectStoreConfig.gcsRegion);
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStore.Backend getBackend() {
        return TierObjectStore.Backend.GCS;
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStoreResponse getObject(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, TierObjectStore.FileType fileType, Integer num, Integer num2) {
        String keyPath = keyPath(objectStoreMetadata, fileType);
        BlobId of = BlobId.of(this.bucket, keyPath);
        if (num != null && num2 != null && num.intValue() > num2.intValue()) {
            throw new IllegalStateException("Invalid range of byteOffsetStart and byteOffsetEnd");
        }
        if (num == null && num2 != null) {
            throw new IllegalStateException("Cannot specify a byteOffsetEnd without specifying a byteOffsetStart");
        }
        log.debug("Fetching object from gs://{}/{}, with range of {} to {}", new Object[]{this.bucket, keyPath, num, num2});
        try {
            return new GcsTierObjectStoreResponse(this.storage.reader(of, new Storage.BlobSourceOption[0]), num == null ? 0L : num.longValue(), num2 == null ? OptionalInt.empty() : OptionalInt.of(num2.intValue() - num.intValue()));
        } catch (Exception e) {
            throw new TierObjectStoreFatalException(String.format("Unknown exception when fetching object, blobId: %s metadata: %s type: %s range %s-%s", of, objectStoreMetadata, fileType, num, num2), e);
        } catch (StorageException e2) {
            throw new TierObjectStoreRetriableException(String.format("Failed to fetch object, blobId: %s metadata: %s type: %s range %s-%s", of, objectStoreMetadata, fileType, num, num2), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File file, File file2, File file3, Optional<File> optional, Optional<ByteBuffer> optional2, Optional<ByteBuffer> optional3) {
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), objectMetadata2, file);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), objectMetadata2, file2);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), objectMetadata2, file3);
            if (optional.isPresent()) {
                putFile(keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), objectMetadata2, optional.get());
            }
            if (optional2.isPresent()) {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), objectMetadata2, optional2.get());
            }
            if (optional3.isPresent()) {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), objectMetadata2, optional3.get());
            }
        } catch (StorageException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment: " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putObject(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, File file, TierObjectStore.FileType fileType) {
        try {
            putFile(keyPath(objectStoreMetadata, fileType), objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt), file);
        } catch (StorageException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, fileType), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, fileType), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        Blob blob;
        ArrayList arrayList = new ArrayList();
        for (TierObjectStore.FileType fileType : TierObjectStore.FileType.values()) {
            switch (fileType) {
                case TRANSACTION_INDEX:
                    if (objectMetadata.hasAbortedTxns()) {
                        arrayList.add(BlobId.of(this.bucket, keyPath(objectMetadata, fileType)));
                        break;
                    } else {
                        break;
                    }
                case PRODUCER_STATE:
                    if (objectMetadata.hasProducerState()) {
                        arrayList.add(BlobId.of(this.bucket, keyPath(objectMetadata, fileType)));
                        break;
                    } else {
                        break;
                    }
                case EPOCH_STATE:
                    if (objectMetadata.hasEpochState()) {
                        arrayList.add(BlobId.of(this.bucket, keyPath(objectMetadata, fileType)));
                        break;
                    } else {
                        break;
                    }
                default:
                    arrayList.add(BlobId.of(this.bucket, keyPath(objectMetadata, fileType)));
                    break;
            }
        }
        log.debug("Deleting " + arrayList);
        ArrayList arrayList2 = new ArrayList();
        try {
            List delete = this.storage.delete(arrayList);
            log.debug("Deletion result " + delete);
            for (int i = 0; i < delete.size(); i++) {
                if (!((Boolean) delete.get(i)).booleanValue() && (blob = this.storage.get((BlobId) arrayList.get(i))) != null) {
                    log.warn("Found object " + blob.getBlobId() + " that was expected to be deleted of " + objectMetadata);
                    arrayList2.add(blob.getBlobId());
                }
            }
            if (!arrayList2.isEmpty()) {
                throw new TierObjectStoreRetriableException("Deletion failed for " + objectMetadata + ". Blobs still exist in object storage with blob ids: " + arrayList2);
            }
        } catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when deleting segment: " + objectMetadata, e);
        } catch (StorageException e2) {
            throw new TierObjectStoreRetriableException("Failed to delete segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void close() {
    }

    private void putFile(String str, Map<String, String> map, File file) throws IOException {
        BlobInfo build = BlobInfo.newBuilder(BlobId.of(this.bucket, str)).setMetadata(map).build();
        log.debug("Uploading object to gs://{}/{}", this.bucket, str);
        WritableByteChannel writer = this.storage.writer(build, new Storage.BlobWriteOption[0]);
        Throwable th = null;
        try {
            FileChannel open = FileChannel.open(file.toPath(), StandardOpenOption.READ);
            Throwable th2 = null;
            try {
                try {
                    if (this.writeChunkSize > 0) {
                        writer.setChunkSize(this.writeChunkSize);
                    }
                    long length = file.length();
                    for (long j = 0; j < length; j += open.transferTo(j, length, writer)) {
                    }
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            open.close();
                        }
                    }
                    if (writer != null) {
                        if (0 == 0) {
                            writer.close();
                            return;
                        }
                        try {
                            writer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (open != null) {
                    if (th2 != null) {
                        try {
                            open.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        open.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (writer != null) {
                if (0 != 0) {
                    try {
                        writer.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    writer.close();
                }
            }
            throw th8;
        }
    }

    private void putBuf(String str, Map<String, String> map, ByteBuffer byteBuffer) throws IOException {
        ByteBuffer duplicate = byteBuffer.duplicate();
        BlobInfo build = BlobInfo.newBuilder(BlobId.of(this.bucket, str)).setMetadata(map).build();
        log.debug("Uploading object gs://{}/{}", this.bucket, str);
        WriteChannel writer = this.storage.writer(build, new Storage.BlobWriteOption[0]);
        Throwable th = null;
        try {
            try {
                if (this.writeChunkSize > 0) {
                    writer.setChunkSize(this.writeChunkSize);
                }
                while (duplicate.hasRemaining()) {
                    writer.write(duplicate);
                }
                if (writer != null) {
                    if (0 == 0) {
                        writer.close();
                        return;
                    }
                    try {
                        writer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (writer != null) {
                if (th != null) {
                    try {
                        writer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    writer.close();
                }
            }
            throw th4;
        }
    }

    private static Storage storage(GcsTierObjectStoreConfig gcsTierObjectStoreConfig) {
        if (!gcsTierObjectStoreConfig.gcsCredFilePath.isPresent()) {
            return StorageOptions.getDefaultInstance().getService();
        }
        try {
            return StorageOptions.newBuilder().setCredentials(GoogleCredentials.fromStream(new FileInputStream(gcsTierObjectStoreConfig.gcsCredFilePath.get())).createScoped(Lists.newArrayList(new String[]{"https://www.googleapis.com/auth/cloud-platform"}))).build().getService();
        } catch (IOException e) {
            throw new TierObjectStoreFatalException("Error in opening GCS credentials file", e);
        }
    }

    private void expectBucket(String str, String str2) throws TierObjectStoreFatalException {
        try {
            Bucket bucket = this.storage.get(str, new Storage.BucketGetOption[]{Storage.BucketGetOption.fields(new Storage.BucketField[]{Storage.BucketField.LOCATION})});
            if (bucket == null) {
                throw new TierObjectStoreFatalException("Configured bucket " + str + " does not exist or could not be found");
            }
            String location = bucket.getLocation();
            if (str2.equalsIgnoreCase(location)) {
                return;
            }
            log.warn("Bucket region {} does not match expected region {}", location, str2);
        } catch (StorageException e) {
            throw new TierObjectStoreFatalException("Unable to access bucket " + str, e);
        }
    }

    private String keyPath(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, TierObjectStore.FileType fileType) {
        return objectStoreMetadata.toPath(this.prefix, fileType);
    }
}
