package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/storage/BlockOutputStream.class */
public class BlockOutputStream extends OutputStream {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) BlockOutputStream.class);
    private BlockID blockID;
    private final String key;
    private final String traceID;
    private final ContainerProtos.BlockData.Builder containerBlockData;
    private XceiverClientManager xceiverClientManager;
    private XceiverClientSpi xceiverClient;
    private final ContainerProtos.ChecksumType checksumType;
    private final int bytesPerChecksum;
    private int chunkSize;
    private final long streamBufferFlushSize;
    private final long streamBufferMaxSize;
    private final long watchTimeout;
    private BufferPool bufferPool;
    private final String streamId = UUID.randomUUID().toString();
    private int chunkIndex = 0;
    private ExecutorService responseExecutor = Executors.newSingleThreadExecutor();
    private ConcurrentSkipListMap<Long, List<ByteBuffer>> commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
    private long totalAckDataLength = 0;
    private ConcurrentHashMap<Long, CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
    private long totalDataFlushedLength = 0;
    private long writtenDataLength = 0;
    private List<DatanodeDetails> failedServers = Collections.emptyList();
    private List<ByteBuffer> bufferList = null;
    private AtomicReference<IOException> ioException = new AtomicReference<>(null);

    public BlockOutputStream(BlockID blockID, String str, XceiverClientManager xceiverClientManager, Pipeline pipeline, String str2, int i, long j, long j2, long j3, BufferPool bufferPool, ContainerProtos.ChecksumType checksumType, int i2) throws IOException {
        this.blockID = blockID;
        this.key = str;
        this.traceID = str2;
        this.chunkSize = i;
        this.containerBlockData = ContainerProtos.BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()).addMetadata(ContainerProtos.KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build());
        this.xceiverClientManager = xceiverClientManager;
        this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
        this.streamBufferFlushSize = j;
        this.streamBufferMaxSize = j2;
        this.watchTimeout = j3;
        this.bufferPool = bufferPool;
        this.checksumType = checksumType;
        this.bytesPerChecksum = i2;
    }

    public BlockID getBlockID() {
        return this.blockID;
    }

    public long getTotalAckDataLength() {
        return this.totalAckDataLength;
    }

    public long getWrittenDataLength() {
        return this.writtenDataLength;
    }

    public List<DatanodeDetails> getFailedServers() {
        return this.failedServers;
    }

    @VisibleForTesting
    public XceiverClientSpi getXceiverClient() {
        return this.xceiverClient;
    }

    @VisibleForTesting
    public long getTotalDataFlushedLength() {
        return this.totalDataFlushedLength;
    }

    @VisibleForTesting
    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    public IOException getIoException() {
        return this.ioException.get();
    }

    @VisibleForTesting
    public Map<Long, List<ByteBuffer>> getCommitIndex2flushedDataMap() {
        return this.commitIndex2flushedDataMap;
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        checkOpen();
        write(new byte[]{(byte) i}, 0, 1);
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        checkOpen();
        if (bArr == null) {
            throw new NullPointerException();
        }
        if (i < 0 || i > bArr.length || i2 < 0 || i + i2 > bArr.length || i + i2 < 0) {
            throw new IndexOutOfBoundsException();
        }
        if (i2 == 0) {
            return;
        }
        while (i2 > 0) {
            ByteBuffer allocateBufferIfNeeded = this.bufferPool.allocateBufferIfNeeded();
            int min = Math.min(this.chunkSize - (allocateBufferIfNeeded.position() % this.chunkSize), i2);
            allocateBufferIfNeeded.put(bArr, i, min);
            if (!allocateBufferIfNeeded.hasRemaining()) {
                writeChunk(allocateBufferIfNeeded);
            }
            i += min;
            i2 -= min;
            this.writtenDataLength += min;
            if (shouldFlush()) {
                updateFlushLength();
                executePutBlock();
            }
            if (isBufferPoolFull()) {
                handleFullBuffer();
            }
        }
    }

    private boolean shouldFlush() {
        return this.bufferPool.computeBufferData() % this.streamBufferFlushSize == 0;
    }

    private void updateFlushLength() {
        this.totalDataFlushedLength += this.writtenDataLength - this.totalDataFlushedLength;
    }

    private boolean isBufferPoolFull() {
        return this.bufferPool.computeBufferData() == this.streamBufferMaxSize;
    }

    public void writeOnRetry(long j) throws IOException {
        if (j == 0) {
            return;
        }
        int i = 0;
        Preconditions.checkArgument(j <= this.streamBufferMaxSize);
        while (j > 0) {
            long min = Math.min(this.chunkSize, j);
            if (min == this.chunkSize) {
                writeChunk(this.bufferPool.getBuffer(i));
            }
            j -= min;
            i++;
            this.writtenDataLength += min;
            if (this.writtenDataLength % this.streamBufferFlushSize == 0) {
                updateFlushLength();
                executePutBlock();
            }
            if (this.writtenDataLength == this.streamBufferMaxSize) {
                handleFullBuffer();
            }
        }
    }

    private void updateFlushIndex(List<Long> list) {
        Preconditions.checkArgument(!this.commitIndex2flushedDataMap.isEmpty());
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            Preconditions.checkState(this.commitIndex2flushedDataMap.containsKey(Long.valueOf(longValue)));
            List<ByteBuffer> remove = this.commitIndex2flushedDataMap.remove(Long.valueOf(longValue));
            this.totalAckDataLength += remove.stream().mapToLong(byteBuffer -> {
                int position = byteBuffer.position();
                Preconditions.checkArgument(position <= this.chunkSize);
                return position;
            }).sum();
            LOG.debug("Total data successfully replicated: " + this.totalAckDataLength);
            this.futureMap.remove(Long.valueOf(this.totalAckDataLength));
            Iterator<ByteBuffer> it2 = remove.iterator();
            while (it2.hasNext()) {
                this.bufferPool.releaseBuffer(it2.next());
            }
        }
    }

    private void handleFullBuffer() throws IOException {
        try {
            checkOpen();
            if (!this.futureMap.isEmpty()) {
                waitOnFlushFutures();
            }
            if (this.commitIndex2flushedDataMap.isEmpty()) {
                return;
            }
            watchForCommit(this.commitIndex2flushedDataMap.keySet().stream().mapToLong(l -> {
                return l.longValue();
            }).min().getAsLong());
        } catch (InterruptedException | ExecutionException e) {
            setIoException(e);
            adjustBuffersOnException();
            throw getIoException();
        }
    }

    private void adjustBuffers(long j) {
        List<Long> list = (List) this.commitIndex2flushedDataMap.keySet().stream().filter(l -> {
            return l.longValue() <= j;
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        updateFlushIndex(list);
    }

    private void adjustBuffersOnException() {
        adjustBuffers(this.xceiverClient.getReplicatedMinCommitIndex());
    }

    private void watchForCommit(long j) throws IOException {
        long logIndex;
        checkOpen();
        Preconditions.checkState(!this.commitIndex2flushedDataMap.isEmpty());
        try {
            XceiverClientReply watchForCommit = this.xceiverClient.watchForCommit(j, this.watchTimeout);
            if (watchForCommit == null) {
                logIndex = 0;
            } else {
                List<DatanodeDetails> datanodes = watchForCommit.getDatanodes();
                if (!datanodes.isEmpty()) {
                    if (this.failedServers.isEmpty()) {
                        this.failedServers = new ArrayList();
                    }
                    this.failedServers.addAll(datanodes);
                }
                logIndex = watchForCommit.getLogIndex();
            }
            adjustBuffers(logIndex);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.warn("watchForCommit failed for index " + j, (Throwable) e);
            setIoException(e);
            adjustBuffersOnException();
            throw getIoException();
        }
    }

    private CompletableFuture<ContainerProtos.ContainerCommandResponseProto> executePutBlock() throws IOException {
        checkOpen();
        long j = this.totalDataFlushedLength;
        Preconditions.checkNotNull(this.bufferList);
        List<ByteBuffer> list = this.bufferList;
        this.bufferList = null;
        Preconditions.checkNotNull(list);
        try {
            XceiverClientReply putBlockAsync = ContainerProtocolCalls.putBlockAsync(this.xceiverClient, this.containerBlockData.build(), this.traceID + ContainerProtos.Type.PutBlock + this.chunkIndex + this.blockID);
            CompletableFuture<ContainerProtos.ContainerCommandResponseProto> exceptionally = putBlockAsync.getResponse().thenApplyAsync(containerCommandResponseProto -> {
                try {
                    validateResponse(containerCommandResponseProto);
                    if (getIoException() == null) {
                        BlockID fromProtobuf = BlockID.getFromProtobuf(containerCommandResponseProto.getPutBlock().getCommittedBlockLength().getBlockID());
                        Preconditions.checkState(this.blockID.getContainerBlockID().equals(fromProtobuf.getContainerBlockID()));
                        this.blockID = fromProtobuf;
                        LOG.debug("Adding index " + putBlockAsync.getLogIndex() + " commitMap size " + this.commitIndex2flushedDataMap.size() + " flushLength " + j + " numBuffers " + list.size() + " blockID " + this.blockID + " bufferPool size" + this.bufferPool.getSize() + " currentBufferIndex " + this.bufferPool.getCurrentBufferIndex());
                        this.commitIndex2flushedDataMap.put(Long.valueOf(putBlockAsync.getLogIndex()), list);
                    }
                    return containerCommandResponseProto;
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            }, (Executor) this.responseExecutor).exceptionally((Function<Throwable, ? extends U>) th -> {
                LOG.debug("putBlock failed for blockID " + this.blockID + " with exception " + th.getLocalizedMessage());
                CompletionException completionException = new CompletionException(th);
                setIoException(completionException);
                throw completionException;
            });
            this.futureMap.put(Long.valueOf(j), exceptionally);
            return exceptionally;
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw new IOException("Unexpected Storage Container Exception: " + e.toString(), e);
        }
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        if (this.xceiverClientManager == null || this.xceiverClient == null || this.bufferPool == null || this.bufferPool.getSize() <= 0) {
            return;
        }
        try {
            handleFlush();
        } catch (InterruptedException | ExecutionException e) {
            setIoException(e);
            adjustBuffersOnException();
            throw getIoException();
        }
    }

    private void writeChunk(ByteBuffer byteBuffer) throws IOException {
        if (this.bufferList == null) {
            this.bufferList = new ArrayList();
        }
        this.bufferList.add(byteBuffer);
        ByteBuffer duplicate = byteBuffer.duplicate();
        duplicate.position(0);
        duplicate.limit(byteBuffer.position());
        writeChunkToContainer(duplicate);
    }

    private void handleFlush() throws IOException, InterruptedException, ExecutionException {
        checkOpen();
        if (this.totalDataFlushedLength < this.writtenDataLength) {
            ByteBuffer currentBuffer = this.bufferPool.getCurrentBuffer();
            Preconditions.checkArgument(currentBuffer.position() > 0);
            if (currentBuffer.position() != this.chunkSize) {
                writeChunk(currentBuffer);
            }
            updateFlushLength();
            executePutBlock();
        }
        waitOnFlushFutures();
        if (!this.commitIndex2flushedDataMap.isEmpty()) {
            long asLong = this.commitIndex2flushedDataMap.keySet().stream().mapToLong(l -> {
                return l.longValue();
            }).max().getAsLong();
            LOG.debug("waiting for last flush Index " + asLong + " to catch up");
            watchForCommit(asLong);
        }
        checkOpen();
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.xceiverClientManager == null || this.xceiverClient == null || this.bufferPool == null) {
            return;
        }
        try {
            if (this.bufferPool.getSize() > 0) {
                try {
                    handleFlush();
                    cleanup(false);
                } catch (InterruptedException | ExecutionException e) {
                    setIoException(e);
                    adjustBuffersOnException();
                    throw getIoException();
                }
            }
        } catch (Throwable th) {
            cleanup(false);
            throw th;
        }
    }

    private void waitOnFlushFutures() throws InterruptedException, ExecutionException {
        CompletableFuture.allOf((CompletableFuture[]) this.futureMap.values().toArray(new CompletableFuture[this.futureMap.size()])).get();
    }

    private void validateResponse(ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto) throws IOException {
        try {
            IOException ioException = getIoException();
            if (ioException != null) {
                throw ioException;
            }
            ContainerProtocolCalls.validateContainerResponse(containerCommandResponseProto);
        } catch (StorageContainerException e) {
            LOG.error("Unexpected Storage Container Exception: ", (Throwable) e);
            setIoException(e);
            throw e;
        }
    }

    private void setIoException(Exception exc) {
        if (getIoException() == null) {
            this.ioException.compareAndSet(null, new IOException("Unexpected Storage Container Exception: " + exc.toString(), exc));
        }
    }

    public void cleanup(boolean z) {
        if (this.xceiverClientManager != null) {
            this.xceiverClientManager.releaseClient(this.xceiverClient, z);
        }
        this.xceiverClientManager = null;
        this.xceiverClient = null;
        if (this.futureMap != null) {
            this.futureMap.clear();
        }
        this.futureMap = null;
        if (this.bufferList != null) {
            this.bufferList.clear();
        }
        this.bufferList = null;
        if (this.commitIndex2flushedDataMap != null) {
            this.commitIndex2flushedDataMap.clear();
        }
        this.commitIndex2flushedDataMap = null;
        this.responseExecutor.shutdown();
    }

    private void checkOpen() throws IOException {
        if (this.xceiverClient == null) {
            throw new IOException("BlockOutputStream has been closed.");
        }
        if (getIoException() != null) {
            adjustBuffersOnException();
            throw getIoException();
        }
    }

    private void writeChunkToContainer(ByteBuffer byteBuffer) throws IOException {
        int remaining = byteBuffer.remaining();
        ByteString copyFrom = ByteString.copyFrom(byteBuffer);
        ChecksumData computeChecksum = new Checksum(this.checksumType, this.bytesPerChecksum).computeChecksum(copyFrom);
        ContainerProtos.ChunkInfo.Builder newBuilder = ContainerProtos.ChunkInfo.newBuilder();
        StringBuilder append = new StringBuilder().append(DigestUtils.md5Hex(this.key)).append("_stream_").append(this.streamId).append("_chunk_");
        int i = this.chunkIndex + 1;
        this.chunkIndex = i;
        ContainerProtos.ChunkInfo build = newBuilder.setChunkName(append.append(i).toString()).setOffset(0L).setLen(remaining).setChecksumData(computeChecksum.getProtoBufMessage()).build();
        try {
            CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response = ContainerProtocolCalls.writeChunkAsync(this.xceiverClient, build, this.blockID, copyFrom, this.traceID + ContainerProtos.Type.WriteChunk + this.chunkIndex + build.getChunkName()).getResponse();
            response.thenApplyAsync(containerCommandResponseProto -> {
                try {
                    validateResponse(containerCommandResponseProto);
                } catch (IOException e) {
                    response.completeExceptionally(e);
                }
                return containerCommandResponseProto;
            }, (Executor) this.responseExecutor).exceptionally((Function<Throwable, ? extends U>) th -> {
                LOG.debug("writing chunk failed " + build.getChunkName() + " blockID " + this.blockID + " with exception " + th.getLocalizedMessage());
                CompletionException completionException = new CompletionException(th);
                setIoException(completionException);
                throw completionException;
            });
            LOG.debug("writing chunk " + build.getChunkName() + " blockID " + this.blockID + " length " + remaining);
            this.containerBlockData.addChunks(build);
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw new IOException("Unexpected Storage Container Exception: " + e.toString(), e);
        }
    }
}
