package org.apache.flink.fs.s3.common.writer;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

@PublicEvolving
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.class */
public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
    private final ReentrantLock lock = new ReentrantLock();
    private final RecoverableMultiPartUpload upload;
    private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider;
    private final long userDefinedMinPartSize;
    private RefCountedFSOutputStream fileStream;
    private long bytesBeforeCurrentPart;

    S3RecoverableFsDataOutputStream(RecoverableMultiPartUpload recoverableMultiPartUpload, FunctionWithException<File, RefCountedFileWithStream, IOException> functionWithException, RefCountedFSOutputStream refCountedFSOutputStream, long j, long j2) {
        Preconditions.checkArgument(j2 >= 0);
        this.upload = (RecoverableMultiPartUpload) Preconditions.checkNotNull(recoverableMultiPartUpload);
        this.tmpFileProvider = (FunctionWithException) Preconditions.checkNotNull(functionWithException);
        this.userDefinedMinPartSize = j;
        this.fileStream = refCountedFSOutputStream;
        this.bytesBeforeCurrentPart = j2;
    }

    public void write(int i) throws IOException {
        this.fileStream.write(i);
    }

    public void write(byte[] bArr, int i, int i2) throws IOException {
        this.fileStream.write(bArr, i, i2);
        openNewPartIfNecessary(this.userDefinedMinPartSize);
    }

    public void flush() throws IOException {
        this.fileStream.flush();
        openNewPartIfNecessary(this.userDefinedMinPartSize);
    }

    public long getPos() throws IOException {
        return this.bytesBeforeCurrentPart + this.fileStream.getPos();
    }

    public void sync() throws IOException {
        this.fileStream.sync();
    }

    public void close() throws IOException {
        lock();
        try {
            this.fileStream.flush();
        } finally {
            IOUtils.closeQuietly((OutputStream) this.fileStream);
            this.fileStream.release();
            unlock();
        }
    }

    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
        lock();
        try {
            this.fileStream.flush();
            openNewPartIfNecessary(this.userDefinedMinPartSize);
            return this.upload.snapshotAndGetRecoverable(this.fileStream);
        } finally {
            unlock();
        }
    }

    public RecoverableFsDataOutputStream.Committer closeForCommit() throws IOException {
        lock();
        try {
            closeAndUploadPart();
            return this.upload.snapshotAndGetCommitter();
        } finally {
            unlock();
        }
    }

    private void openNewPartIfNecessary(long j) throws IOException {
        long pos = this.fileStream.getPos();
        if (pos >= j) {
            lock();
            try {
                uploadCurrentAndOpenNewPart(pos);
                unlock();
            } catch (Throwable th) {
                unlock();
                throw th;
            }
        }
    }

    private void uploadCurrentAndOpenNewPart(long j) throws IOException {
        this.bytesBeforeCurrentPart += j;
        closeAndUploadPart();
        this.fileStream = RefCountedBufferingFileStream.openNew(this.tmpFileProvider);
    }

    private void closeAndUploadPart() throws IOException {
        this.fileStream.flush();
        this.fileStream.close();
        if (this.fileStream.getPos() > 0) {
            this.upload.uploadPart(this.fileStream);
        }
        this.fileStream.release();
    }

    private void lock() throws IOException {
        try {
            this.lock.lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("interrupted");
        }
    }

    private void unlock() {
        this.lock.unlock();
    }

    public static S3RecoverableFsDataOutputStream newStream(RecoverableMultiPartUpload recoverableMultiPartUpload, FunctionWithException<File, RefCountedFileWithStream, IOException> functionWithException, long j) throws IOException {
        Preconditions.checkArgument(j >= FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE);
        return new S3RecoverableFsDataOutputStream(recoverableMultiPartUpload, functionWithException, boundedBufferingFileStream(functionWithException, Optional.empty()), j, 0L);
    }

    public static S3RecoverableFsDataOutputStream recoverStream(RecoverableMultiPartUpload recoverableMultiPartUpload, FunctionWithException<File, RefCountedFileWithStream, IOException> functionWithException, long j, long j2) throws IOException {
        Preconditions.checkArgument(j >= FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE);
        return new S3RecoverableFsDataOutputStream(recoverableMultiPartUpload, functionWithException, boundedBufferingFileStream(functionWithException, recoverableMultiPartUpload.getIncompletePart()), j, j2);
    }

    private static RefCountedBufferingFileStream boundedBufferingFileStream(FunctionWithException<File, RefCountedFileWithStream, IOException> functionWithException, Optional<File> optional) throws IOException {
        return !optional.isPresent() ? RefCountedBufferingFileStream.openNew(functionWithException) : RefCountedBufferingFileStream.restore(functionWithException, optional.get());
    }
}
