package org.apache.hadoop.fs.azure;

import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.TimeZone;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.azure.StorageInterface;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/fs/azure/BlockBlobAppendStream.class
 */
/* loaded from: input_file:hadoop-azure-2.8.1.jar:org/apache/hadoop/fs/azure/BlockBlobAppendStream.class */
public class BlockBlobAppendStream extends OutputStream {
    private final String key;
    private final int bufferSize;
    private ByteArrayOutputStream outBuffer;
    private final StorageInterface.CloudBlockBlobWrapper blob;
    private final OperationContext opContext;
    private volatile boolean leaseFreed;
    private final List<BlockEntry> uncommittedBlockEntries;
    private static final int UNSET_BLOCKS_COUNT = -1;
    private static final int LEASE_RENEWAL_PERIOD = 10000;
    private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
    private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
    public static final String APPEND_LEASE = "append_lease";
    public static final int APPEND_LEASE_TIMEOUT = 30000;
    public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
    private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
    private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
    private static final int MAX_BLOCK_COUNT = 100000;
    private ThreadPoolExecutor ioThreadPool;
    private final AtomicInteger threadSequenceNumber;
    private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
    private static final String UTC_STR = "UTC";
    private boolean closed = false;
    private boolean initialized = false;
    private volatile IOException lastError = null;
    private long nextBlockCount = -1;
    private final Random sequenceGenerator = new Random();

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/fs/azure/BlockBlobAppendStream$AppendRenewer.class
     */
    /* loaded from: input_file:hadoop-azure-2.8.1.jar:org/apache/hadoop/fs/azure/BlockBlobAppendStream$AppendRenewer.class */
    private class AppendRenewer implements Runnable {
        private AppendRenewer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!BlockBlobAppendStream.this.leaseFreed) {
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                    BlockBlobAppendStream.LOG.debug("Appender Renewer thread interrupted");
                    Thread.currentThread().interrupt();
                }
                Log.debug("Attempting to renew append lease on {}", BlockBlobAppendStream.this.key);
                try {
                    if (!BlockBlobAppendStream.this.leaseFreed && !BlockBlobAppendStream.this.updateBlobAppendMetadata(true, true)) {
                        BlockBlobAppendStream.LOG.error("Unable to re-acquire append lease on the Blob {} ", BlockBlobAppendStream.this.key);
                        BlockBlobAppendStream.this.leaseFreed = true;
                    }
                } catch (StorageException e2) {
                    BlockBlobAppendStream.LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} Error Code : {}", new Object[]{BlockBlobAppendStream.this.key, e2, e2.getErrorCode()});
                    BlockBlobAppendStream.this.leaseFreed = true;
                }
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/fs/azure/BlockBlobAppendStream$UploaderThreadFactory.class
     */
    /* loaded from: input_file:hadoop-azure-2.8.1.jar:org/apache/hadoop/fs/azure/BlockBlobAppendStream$UploaderThreadFactory.class */
    class UploaderThreadFactory implements ThreadFactory {
        UploaderThreadFactory() {
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName(String.format("%s-%s-%d", BlockBlobAppendStream.THREAD_ID_PREFIX, BlockBlobAppendStream.this.key, Integer.valueOf(BlockBlobAppendStream.this.threadSequenceNumber.getAndIncrement())));
            return thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/fs/azure/BlockBlobAppendStream$WriteRequest.class
     */
    /* loaded from: input_file:hadoop-azure-2.8.1.jar:org/apache/hadoop/fs/azure/BlockBlobAppendStream$WriteRequest.class */
    public class WriteRequest implements Runnable {
        private final byte[] dataPayload;
        private final String blockId;

        public WriteRequest(byte[] bArr, String str) {
            this.dataPayload = bArr;
            this.blockId = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            IOException iOException = null;
            while (i < 3) {
                try {
                    BlockBlobAppendStream.this.blob.uploadBlock(this.blockId, new ByteArrayInputStream(this.dataPayload), this.dataPayload.length, new BlobRequestOptions(), BlockBlobAppendStream.this.opContext);
                    break;
                } catch (Exception e) {
                    Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", BlockBlobAppendStream.this.key, e);
                    i++;
                    iOException = new IOException("Encountered Exception while uploading block", e);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            if (i == 3) {
                BlockBlobAppendStream.this.lastError = iOException;
            }
        }
    }

    public BlockBlobAppendStream(StorageInterface.CloudBlockBlobWrapper cloudBlockBlobWrapper, String str, int i, OperationContext operationContext) throws IOException {
        if (null == str || 0 == str.length()) {
            throw new IllegalArgumentException("Illegal argument: The key string is null or empty");
        }
        if (0 >= i) {
            throw new IllegalArgumentException("Illegal argument bufferSize cannot be zero or negative");
        }
        this.blob = cloudBlockBlobWrapper;
        this.opContext = operationContext;
        this.key = str;
        this.bufferSize = i;
        this.threadSequenceNumber = new AtomicInteger(0);
        setBlocksCount();
        this.outBuffer = new ByteArrayOutputStream(i);
        this.uncommittedBlockEntries = new ArrayList();
        try {
            if (updateBlobAppendMetadata(true, false)) {
                this.leaseFreed = false;
            } else {
                LOG.error("Unable to set Append Lease on the Blob : {} Possibly because another client already has a create or append stream open on the Blob", this.key);
                throw new IOException("Unable to set Append lease on the Blob. Possibly because another client already had an append stream open on the Blob.");
            }
        } catch (StorageException e) {
            LOG.error("Encountered Storage exception while acquiring append lease on blob : {}. Storage Exception : {} ErrorCode : {}", new Object[]{this.key, e, e.getErrorCode()});
            throw new IOException((Throwable) e);
        }
    }

    public synchronized void initialize() {
        if (this.initialized) {
            return;
        }
        Thread thread = new Thread(new AppendRenewer());
        thread.setDaemon(true);
        thread.setName(String.format("%s-AppendLeaseRenewer", this.key));
        thread.start();
        this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new UploaderThreadFactory());
        this.initialized = true;
    }

    public String getKey() {
        return this.key;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        write(new byte[]{(byte) (i & 255)});
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr) throws IOException {
        write(bArr, 0, bArr.length);
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
        }
        writeInternal(bArr, i, i2);
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        if (!this.initialized) {
            throw new IOException("Trying to close an uninitialized Append stream");
        }
        if (this.closed) {
            return;
        }
        if (this.leaseFreed) {
            throw new IOException(String.format("Attempting to close an append stream on blob : %s  that does not have lease on the Blob. Failing close", this.key));
        }
        if (this.outBuffer.size() > 0) {
            uploadBlockToStorage(this.outBuffer.toByteArray());
        }
        this.ioThreadPool.shutdown();
        try {
            if (!this.ioThreadPool.awaitTermination(10L, TimeUnit.MINUTES)) {
                LOG.error("Time out occured while waiting for IO request to finish in append for blob : {}", this.key);
                NativeAzureFileSystemHelper.logAllLiveStackTraces();
                throw new IOException("Timed out waiting for IO requests to finish");
            }
            if (this.lastError == null) {
                commitAppendBlocks();
            }
            cleanup();
            if (this.lastError != null) {
                throw this.lastError;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Upload block operation in append interrupted for blob {}. Failing close", this.key);
            throw new IOException("Append Commit interrupted.");
        }
    }

    private synchronized void cleanup() {
        this.closed = true;
        try {
            updateBlobAppendMetadata(false, true);
        } catch (StorageException e) {
            LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} Error Code : {}", new Object[]{this.key, e, e.getErrorCode()});
            this.lastError = new IOException((Throwable) e);
        }
        this.leaseFreed = true;
    }

    private synchronized void commitAppendBlocks() throws IOException {
        SelfRenewingLease selfRenewingLease = null;
        try {
            try {
                if (this.uncommittedBlockEntries.size() > 0) {
                    selfRenewingLease = new SelfRenewingLease(this.blob);
                    List<BlockEntry> downloadBlockList = this.blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), this.opContext);
                    downloadBlockList.addAll(this.uncommittedBlockEntries);
                    AccessCondition accessCondition = new AccessCondition();
                    accessCondition.setLeaseID(selfRenewingLease.getLeaseID());
                    this.blob.commitBlockList(downloadBlockList, accessCondition, new BlobRequestOptions(), this.opContext);
                    this.uncommittedBlockEntries.clear();
                }
                if (selfRenewingLease != null) {
                    try {
                        selfRenewingLease.free();
                    } catch (StorageException e) {
                        LOG.debug("Exception encountered while releasing lease for blob : {} StorageException : {} ErrorCode : {}", new Object[]{this.key, e, e.getErrorCode()});
                    }
                }
            } catch (StorageException e2) {
                LOG.error("Storage exception encountered during block commit phase of append for blob : {} Storage Exception : {} Error Code: {}", new Object[]{this.key, e2, e2.getErrorCode()});
                throw new IOException("Encountered Exception while committing append blocks", e2);
            }
        } catch (Throwable th) {
            if (selfRenewingLease != null) {
                try {
                    selfRenewingLease.free();
                } catch (StorageException e3) {
                    LOG.debug("Exception encountered while releasing lease for blob : {} StorageException : {} ErrorCode : {}", new Object[]{this.key, e3, e3.getErrorCode()});
                }
            }
            throw th;
        }
    }

    private void setBlocksCount() throws IOException {
        try {
            if (this.nextBlockCount == -1) {
                this.nextBlockCount = this.sequenceGenerator.nextInt(Integer.MAX_VALUE) + this.sequenceGenerator.nextInt(2147383647);
                this.nextBlockCount += this.blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), this.opContext).size();
            }
        } catch (StorageException e) {
            LOG.debug("Encountered storage exception during setting next Block Count. StorageException : {} ErrorCode : {}", e, e.getErrorCode());
            throw new IOException((Throwable) e);
        }
    }

    private String generateBlockId() throws IOException {
        if (this.nextBlockCount == -1) {
            throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
        }
        return new String(Base64.encodeBase64(getBytesFromLong(this.nextBlockCount)), StandardCharsets.UTF_8);
    }

    private static byte[] getBytesFromLong(long j) {
        byte[] bArr = new byte[8];
        for (int i = 0; i < 8; i++) {
            bArr[7 - i] = (byte) ((j >> (8 * i)) & 255);
        }
        return bArr;
    }

    private synchronized void uploadBlockToStorage(byte[] bArr) throws IOException {
        this.nextBlockCount++;
        String generateBlockId = generateBlockId();
        this.uncommittedBlockEntries.add(new BlockEntry(generateBlockId));
        this.ioThreadPool.execute(new WriteRequest(bArr, generateBlockId));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean updateBlobAppendMetadata(boolean z, boolean z2) throws StorageException {
        boolean z3;
        SelfRenewingLease selfRenewingLease = null;
        int i = 0;
        while (i < 3) {
            synchronized (this) {
                try {
                    Calendar calendar = Calendar.getInstance(Locale.US);
                    calendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
                    long time = calendar.getTime().getTime();
                    selfRenewingLease = new SelfRenewingLease(this.blob);
                    this.blob.downloadAttributes(this.opContext);
                    HashMap<String, String> metadata = this.blob.getMetadata();
                    if (metadata.containsKey(APPEND_LEASE) && time - Long.parseLong(metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= 30000 && !metadata.get(APPEND_LEASE).equals(Boolean.toString(z2))) {
                        try {
                            if (selfRenewingLease != null) {
                                try {
                                    selfRenewingLease.free();
                                } catch (StorageException e) {
                                    LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, e, e.getErrorCode()});
                                }
                            }
                            return z3;
                        } finally {
                        }
                    }
                    metadata.put(APPEND_LEASE, Boolean.toString(z));
                    metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(time));
                    this.blob.setMetadata(metadata);
                    AccessCondition accessCondition = new AccessCondition();
                    accessCondition.setLeaseID(selfRenewingLease.getLeaseID());
                    this.blob.uploadMetadata(accessCondition, null, this.opContext);
                    try {
                        if (selfRenewingLease != null) {
                            try {
                                selfRenewingLease.free();
                            } catch (StorageException e2) {
                                LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, e2, e2.getErrorCode()});
                            }
                        }
                        return true;
                    } finally {
                    }
                } catch (StorageException e3) {
                    try {
                        LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} Error Code : {}", new Object[]{this.key, e3, e3.getErrorCode()});
                        i++;
                        if (selfRenewingLease != null) {
                            try {
                                try {
                                    selfRenewingLease.free();
                                    selfRenewingLease = null;
                                } catch (StorageException e4) {
                                    LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, e4, e4.getErrorCode()});
                                    selfRenewingLease = null;
                                }
                            } finally {
                            }
                        }
                        if (i == 3) {
                            throw e3;
                        }
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e5) {
                            LOG.debug("Blob append metadata updated method interrupted");
                            Thread.currentThread().interrupt();
                        }
                    } catch (Throwable th) {
                        try {
                            if (selfRenewingLease != null) {
                                try {
                                    selfRenewingLease.free();
                                } catch (StorageException e6) {
                                    LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, e6, e6.getErrorCode()});
                                }
                            }
                            throw th;
                        } finally {
                        }
                    }
                }
            }
        }
        return false;
    }

    private synchronized void writeInternal(byte[] bArr, int i, int i2) throws IOException {
        if (!this.initialized) {
            throw new IOException("Trying to write to an un-initialized Append stream");
        }
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        if (this.leaseFreed) {
            throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write", new Object[0]));
        }
        byte[] bArr2 = new byte[i2];
        System.arraycopy(bArr, i, bArr2, 0, i2);
        while (this.outBuffer.size() + bArr2.length > this.bufferSize) {
            byte[] bArr3 = new byte[this.bufferSize];
            System.arraycopy(this.outBuffer.toByteArray(), 0, bArr3, 0, this.outBuffer.size());
            int size = this.bufferSize - this.outBuffer.size();
            System.arraycopy(bArr2, 0, bArr3, this.outBuffer.size(), size);
            uploadBlockToStorage(bArr3);
            byte[] bArr4 = new byte[bArr2.length - size];
            System.arraycopy(bArr2, size, bArr4, 0, bArr2.length - size);
            bArr2 = bArr4;
            this.outBuffer = new ByteArrayOutputStream(this.bufferSize);
        }
        this.outBuffer.write(bArr2);
    }
}
