package org.apache.flink.fs.osshadoop.writer;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.RefCountedBufferingFileStream;
import org.apache.flink.core.fs.RefCountedFileWithStream;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.FunctionWithException;

@PublicEvolving
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStream.class */
public class OSSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
    private final ReentrantLock lock = new ReentrantLock();
    private long ossUploadPartSize;
    private FunctionWithException<File, RefCountedFileWithStream, IOException> cachedFileCreator;
    private RefCountedBufferingFileStream fileStream;
    private OSSRecoverableMultipartUpload upload;
    private long sizeBeforeCurrentPart;

    public OSSRecoverableFsDataOutputStream(long j, FunctionWithException<File, RefCountedFileWithStream, IOException> functionWithException, OSSRecoverableMultipartUpload oSSRecoverableMultipartUpload, long j2) throws IOException {
        this.ossUploadPartSize = j;
        this.cachedFileCreator = functionWithException;
        this.upload = oSSRecoverableMultipartUpload;
        if (oSSRecoverableMultipartUpload.getIncompletePart().isPresent()) {
            this.fileStream = RefCountedBufferingFileStream.restore(this.cachedFileCreator, oSSRecoverableMultipartUpload.getIncompletePart().get());
        } else {
            this.fileStream = RefCountedBufferingFileStream.openNew(this.cachedFileCreator);
        }
        this.sizeBeforeCurrentPart = j2;
    }

    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
        lock();
        try {
            this.fileStream.flush();
            switchNewPartFileIfNecessary(this.ossUploadPartSize);
            return this.upload.getRecoverable(this.fileStream);
        } finally {
            unlock();
        }
    }

    public RecoverableFsDataOutputStream.Committer closeForCommit() throws IOException {
        lock();
        try {
            uploadCurrentPart();
            return this.upload.getCommitter();
        } finally {
            unlock();
        }
    }

    private void uploadCurrentPart() throws IOException {
        this.fileStream.flush();
        this.fileStream.close();
        if (this.fileStream.getPos() > 0) {
            this.upload.uploadPart(this.fileStream);
        }
        this.fileStream.release();
    }

    public long getPos() throws IOException {
        return this.sizeBeforeCurrentPart + this.fileStream.getPos();
    }

    public void write(int i) throws IOException {
        this.fileStream.write(i);
    }

    public void write(byte[] bArr, int i, int i2) throws IOException {
        this.fileStream.write(bArr, i, i2);
        switchNewPartFileIfNecessary(this.ossUploadPartSize);
    }

    public void flush() throws IOException {
        this.fileStream.flush();
        switchNewPartFileIfNecessary(this.ossUploadPartSize);
    }

    public void sync() throws IOException {
        this.fileStream.sync();
    }

    public void close() throws IOException {
        lock();
        try {
            this.fileStream.flush();
        } finally {
            IOUtils.closeQuietly(this.fileStream);
            this.fileStream.release();
            unlock();
        }
    }

    private void lock() throws IOException {
        try {
            this.lock.lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("interrupted exception: " + e);
        }
    }

    private void unlock() {
        this.lock.unlock();
    }

    private void switchNewPartFileIfNecessary(long j) throws IOException {
        if (this.fileStream.getPos() >= j) {
            lock();
            try {
                this.sizeBeforeCurrentPart += this.fileStream.getPos();
                uploadCurrentPart();
                this.fileStream = RefCountedBufferingFileStream.openNew(this.cachedFileCreator);
                unlock();
            } catch (Throwable th) {
                unlock();
                throw th;
            }
        }
    }
}
