package org.apache.iceberg.aws.s3;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicates;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.utils.BinaryUtils;

/* loaded from: input_file:org/apache/iceberg/aws/s3/S3OutputStream.class */
class S3OutputStream extends PositionOutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);
    private static final String digestAlgorithm = "MD5";
    private static volatile ExecutorService executorService;
    private final StackTraceElement[] createStack;
    private final S3Client s3;
    private final S3URI location;
    private final S3FileIOProperties s3FileIOProperties;
    private final Set<Tag> writeTags;
    private CountingOutputStream stream;
    private final File stagingDirectory;
    private File currentStagingFile;
    private String multipartUploadId;
    private final int multiPartSize;
    private final int multiPartThresholdSize;
    private final boolean isChecksumEnabled;
    private final MessageDigest completeMessageDigest;
    private MessageDigest currentPartMessageDigest;
    private final Counter writeBytes;
    private final Counter writeOperations;
    private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
    private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
    private long pos = 0;
    private boolean closed = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/aws/s3/S3OutputStream$FileAndDigest.class */
    public static class FileAndDigest {
        private final File file;
        private final MessageDigest digest;

        FileAndDigest(File file, MessageDigest messageDigest) {
            this.file = file;
            this.digest = messageDigest;
        }

        File file() {
            return this.file;
        }

        byte[] digest() {
            return this.digest.digest();
        }

        public boolean hasDigest() {
            return this.digest != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public S3OutputStream(S3Client s3Client, S3URI s3uri, S3FileIOProperties s3FileIOProperties, MetricsContext metricsContext) throws IOException {
        if (executorService == null) {
            synchronized (S3OutputStream.class) {
                if (executorService == null) {
                    executorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(s3FileIOProperties.multipartUploadThreads(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iceberg-s3fileio-upload-%d").build()));
                }
            }
        }
        this.s3 = s3Client;
        this.location = s3uri;
        this.s3FileIOProperties = s3FileIOProperties;
        this.writeTags = s3FileIOProperties.writeTags();
        this.createStack = Thread.currentThread().getStackTrace();
        this.multiPartSize = s3FileIOProperties.multiPartSize();
        this.multiPartThresholdSize = (int) (this.multiPartSize * s3FileIOProperties.multipartThresholdFactor());
        this.stagingDirectory = new File(s3FileIOProperties.stagingDirectory());
        this.isChecksumEnabled = s3FileIOProperties.isChecksumEnabled();
        try {
            this.completeMessageDigest = this.isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
            this.writeBytes = metricsContext.counter(FileIOMetricsContext.WRITE_BYTES, MetricsContext.Unit.BYTES);
            this.writeOperations = metricsContext.counter(FileIOMetricsContext.WRITE_OPERATIONS);
            newStream();
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("Failed to create message digest needed for s3 checksum checks", e);
        }
    }

    @Override // org.apache.iceberg.io.PositionOutputStream
    public long getPos() {
        return this.pos;
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        this.stream.flush();
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        if (this.stream.getCount() >= this.multiPartSize) {
            newStream();
            uploadParts();
        }
        this.stream.write(i);
        this.pos++;
        this.writeBytes.increment();
        this.writeOperations.increment();
        if (this.multipartUploadId != null || this.pos < this.multiPartThresholdSize) {
            return;
        }
        initializeMultiPartUpload();
        uploadParts();
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        int i3 = i2;
        int i4 = i;
        while (this.stream.getCount() + i3 > this.multiPartSize) {
            int count = this.multiPartSize - ((int) this.stream.getCount());
            this.stream.write(bArr, i4, count);
            i3 -= count;
            i4 += count;
            newStream();
            uploadParts();
        }
        this.stream.write(bArr, i4, i3);
        this.pos += i2;
        this.writeBytes.increment(i2);
        this.writeOperations.increment();
        if (this.multipartUploadId != null || this.pos < this.multiPartThresholdSize) {
            return;
        }
        initializeMultiPartUpload();
        uploadParts();
    }

    private void newStream() throws IOException {
        if (this.stream != null) {
            this.stream.close();
        }
        createStagingDirectoryIfNotExists();
        this.currentStagingFile = File.createTempFile("s3fileio-", ".tmp", this.stagingDirectory);
        this.currentStagingFile.deleteOnExit();
        try {
            this.currentPartMessageDigest = this.isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
            this.stagingFiles.add(new FileAndDigest(this.currentStagingFile, this.currentPartMessageDigest));
            if (this.isChecksumEnabled) {
                this.stream = new CountingOutputStream(this.multipartUploadId != null ? new DigestOutputStream(new BufferedOutputStream(new FileOutputStream(this.currentStagingFile)), this.currentPartMessageDigest) : new DigestOutputStream(new DigestOutputStream(new BufferedOutputStream(new FileOutputStream(this.currentStagingFile)), this.currentPartMessageDigest), this.completeMessageDigest));
            } else {
                this.stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(this.currentStagingFile)));
            }
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("Failed to create message digest needed for s3 checksum checks.", e);
        }
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        super.close();
        this.closed = true;
        try {
            this.stream.close();
            completeUploads();
        } finally {
            cleanUpStagingFiles();
        }
    }

    private void initializeMultiPartUpload() {
        CreateMultipartUploadRequest.Builder key = CreateMultipartUploadRequest.builder().bucket(this.location.bucket()).key(this.location.key());
        if (this.writeTags != null && !this.writeTags.isEmpty()) {
            key.tagging((Tagging) Tagging.builder().tagSet(this.writeTags).build());
        }
        S3RequestUtil.configureEncryption(this.s3FileIOProperties, key);
        S3RequestUtil.configurePermission(this.s3FileIOProperties, key);
        this.multipartUploadId = this.s3.createMultipartUpload((CreateMultipartUploadRequest) key.build()).uploadId();
    }

    private void uploadParts() {
        if (this.multipartUploadId == null) {
            return;
        }
        this.stagingFiles.stream().filter(fileAndDigest -> {
            return this.closed || !fileAndDigest.file().equals(this.currentStagingFile);
        }).filter(Predicates.not(fileAndDigest2 -> {
            return this.multiPartMap.containsKey(fileAndDigest2.file());
        })).forEach(fileAndDigest3 -> {
            File file = fileAndDigest3.file();
            UploadPartRequest.Builder contentLength = UploadPartRequest.builder().bucket(this.location.bucket()).key(this.location.key()).uploadId(this.multipartUploadId).partNumber(Integer.valueOf(this.stagingFiles.indexOf(fileAndDigest3) + 1)).contentLength(Long.valueOf(file.length()));
            if (fileAndDigest3.hasDigest()) {
                contentLength.contentMD5(BinaryUtils.toBase64(fileAndDigest3.digest()));
            }
            S3RequestUtil.configureEncryption(this.s3FileIOProperties, contentLength);
            UploadPartRequest uploadPartRequest = (UploadPartRequest) contentLength.build();
            this.multiPartMap.put(file, CompletableFuture.supplyAsync(() -> {
                return (CompletedPart) CompletedPart.builder().eTag(this.s3.uploadPart(uploadPartRequest, RequestBody.fromFile(file)).eTag()).partNumber(uploadPartRequest.partNumber()).build();
            }, executorService).whenComplete((completedPart, th) -> {
                try {
                    Files.deleteIfExists(file.toPath());
                } catch (IOException e) {
                    LOG.warn("Failed to delete staging file: {}", file, e);
                }
                if (th != null) {
                    LOG.error("Failed to upload part: {}", uploadPartRequest, th);
                }
            }));
        });
    }

    private void completeMultiPartUpload() {
        Preconditions.checkState(this.closed, "Complete upload called on open stream: " + this.location);
        try {
            Tasks.Builder throwFailureWhenFinished = Tasks.foreach((CompleteMultipartUploadRequest) CompleteMultipartUploadRequest.builder().bucket(this.location.bucket()).key(this.location.key()).uploadId(this.multipartUploadId).multipartUpload((CompletedMultipartUpload) CompletedMultipartUpload.builder().parts((List) this.multiPartMap.values().stream().map((v0) -> {
                return v0.join();
            }).sorted(Comparator.comparing((v0) -> {
                return v0.partNumber();
            })).collect(Collectors.toList())).build()).build()).noRetry().onFailure((completeMultipartUploadRequest, exc) -> {
                LOG.error("Failed to complete multipart upload request: {}", completeMultipartUploadRequest, exc);
                abortUpload();
            }).throwFailureWhenFinished();
            S3Client s3Client = this.s3;
            Objects.requireNonNull(s3Client);
            throwFailureWhenFinished.run(s3Client::completeMultipartUpload);
        } catch (CompletionException e) {
            this.multiPartMap.values().forEach(completableFuture -> {
                completableFuture.cancel(true);
            });
            abortUpload();
            throw e;
        }
    }

    private void abortUpload() {
        if (this.multipartUploadId != null) {
            try {
                this.s3.abortMultipartUpload((AbortMultipartUploadRequest) AbortMultipartUploadRequest.builder().bucket(this.location.bucket()).key(this.location.key()).uploadId(this.multipartUploadId).build());
            } finally {
                cleanUpStagingFiles();
            }
        }
    }

    private void cleanUpStagingFiles() {
        Tasks.foreach(this.stagingFiles.stream().map((v0) -> {
            return v0.file();
        })).suppressFailureWhenFinished().onFailure((file, exc) -> {
            LOG.warn("Failed to delete staging file: {}", file, exc);
        }).run((v0) -> {
            v0.delete();
        });
    }

    private void completeUploads() {
        if (this.multipartUploadId != null) {
            uploadParts();
            completeMultiPartUpload();
            return;
        }
        long sum = this.stagingFiles.stream().map((v0) -> {
            return v0.file();
        }).mapToLong((v0) -> {
            return v0.length();
        }).sum();
        ContentStreamProvider contentStreamProvider = () -> {
            return new BufferedInputStream((InputStream) this.stagingFiles.stream().map((v0) -> {
                return v0.file();
            }).map(S3OutputStream::uncheckedInputStream).reduce(SequenceInputStream::new).orElseGet(() -> {
                return new ByteArrayInputStream(new byte[0]);
            }));
        };
        PutObjectRequest.Builder key = PutObjectRequest.builder().bucket(this.location.bucket()).key(this.location.key());
        if (this.writeTags != null && !this.writeTags.isEmpty()) {
            key.tagging((Tagging) Tagging.builder().tagSet(this.writeTags).build());
        }
        if (this.isChecksumEnabled) {
            key.contentMD5(BinaryUtils.toBase64(this.completeMessageDigest.digest()));
        }
        S3RequestUtil.configureEncryption(this.s3FileIOProperties, key);
        S3RequestUtil.configurePermission(this.s3FileIOProperties, key);
        this.s3.putObject((PutObjectRequest) key.build(), RequestBody.fromContentProvider(contentStreamProvider, sum, "application/octet-stream"));
    }

    private static InputStream uncheckedInputStream(File file) {
        try {
            return new FileInputStream(file);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void createStagingDirectoryIfNotExists() throws IOException, SecurityException {
        if (this.stagingDirectory.exists()) {
            return;
        }
        LOG.info("Staging directory does not exist, trying to create one: {}", this.stagingDirectory.getAbsolutePath());
        if (this.stagingDirectory.mkdirs()) {
            LOG.info("Successfully created staging directory: {}", this.stagingDirectory.getAbsolutePath());
        } else {
            if (!this.stagingDirectory.exists()) {
                throw new IOException("Failed to create staging directory due to some unknown reason: " + this.stagingDirectory.getAbsolutePath());
            }
            LOG.info("Successfully created staging directory by another process: {}", this.stagingDirectory.getAbsolutePath());
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.closed) {
            return;
        }
        close();
        LOG.warn("Unclosed output stream created by:\n\t{}", Joiner.on("\n\t").join(Arrays.copyOfRange(this.createStack, 1, this.createStack.length)));
    }
}
