package org.apache.hadoop.hdds.scm.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.class */
public class BlockDataStreamOutput implements ByteBufferStreamOutput {
    public static final int PUT_BLOCK_REQUEST_LENGTH_MAX = 1048576;
    public static final String EXCEPTION_MSG = "Unexpected Storage Container Exception: ";
    private AtomicReference<BlockID> blockID;
    private final ContainerProtos.BlockData.Builder containerBlockData;
    private XceiverClientFactory xceiverClientFactory;
    private XceiverClientRatis xceiverClient;
    private OzoneClientConfig config;
    private int chunkIndex;
    private List<StreamBuffer> bufferList;
    private final AtomicReference<IOException> ioException;
    private final ExecutorService responseExecutor;
    private long totalDataFlushedLength;
    private long writtenDataLength;
    private final StreamCommitWatcher commitWatcher;
    private final List<DatanodeDetails> failedServers;
    private final Checksum checksum;
    private int flushPeriod;
    private final Token<? extends TokenIdentifier> token;
    private final DataStreamOutput out;
    private CompletableFuture<DataStreamReply> dataStreamCloseReply;
    private StreamBuffer currentBuffer;
    private XceiverClientMetrics metrics;
    private List<StreamBuffer> buffersForPutBlock;
    private boolean isDatastreamPipelineMode;
    public static final Logger LOG = LoggerFactory.getLogger(BlockDataStreamOutput.class);
    private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = new CompletableFuture[0];
    private final AtomicLong chunkOffset = new AtomicLong();
    private Queue<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> putBlockFutures = new LinkedList();
    private List<CompletableFuture<DataStreamReply>> futures = new ArrayList();
    private final long syncSize = 0;
    private long syncPosition = 0;

    public BlockDataStreamOutput(BlockID blockID, XceiverClientFactory xceiverClientFactory, Pipeline pipeline, OzoneClientConfig ozoneClientConfig, Token<? extends TokenIdentifier> token, List<StreamBuffer> list) throws IOException {
        this.xceiverClientFactory = xceiverClientFactory;
        this.config = ozoneClientConfig;
        this.isDatastreamPipelineMode = ozoneClientConfig.isDatastreamPipelineMode();
        this.blockID = new AtomicReference<>(blockID);
        this.containerBlockData = ContainerProtos.BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()).addMetadata(ContainerProtos.KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build());
        this.xceiverClient = (XceiverClientRatis) xceiverClientFactory.acquireClient(pipeline, true);
        this.token = token;
        this.out = setupStream(pipeline);
        this.bufferList = list;
        this.flushPeriod = (int) (ozoneClientConfig.getStreamBufferFlushSize() / ozoneClientConfig.getStreamBufferSize());
        Preconditions.checkArgument(((long) this.flushPeriod) * ((long) ozoneClientConfig.getStreamBufferSize()) == ozoneClientConfig.getStreamBufferFlushSize());
        this.responseExecutor = Executors.newSingleThreadExecutor();
        this.commitWatcher = new StreamCommitWatcher(this.xceiverClient, list);
        this.totalDataFlushedLength = 0L;
        this.writtenDataLength = 0L;
        this.failedServers = new ArrayList(0);
        this.ioException = new AtomicReference<>(null);
        this.checksum = new Checksum(ozoneClientConfig.getChecksumType(), ozoneClientConfig.getBytesPerChecksum());
        this.metrics = XceiverClientManager.getXceiverClientMetrics();
    }

    private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
        ContainerProtos.ContainerCommandRequestProto.Builder writeChunk = ContainerProtos.ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos.Type.StreamInit).setContainerID(this.blockID.get().getContainerID()).setDatanodeUuid(pipeline.getFirstNode().getUuidString()).setWriteChunk(ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(this.blockID.get().getDatanodeBlockIDProtobuf()));
        if (this.token != null) {
            writeChunk.setEncodedToken(this.token.encodeToUrlString());
        }
        ContainerCommandRequestMessage message = ContainerCommandRequestMessage.toMessage(writeChunk.build(), null);
        return this.isDatastreamPipelineMode ? ((DataStreamApi) Preconditions.checkNotNull(this.xceiverClient.getDataStreamApi())).stream(message.getContent().asReadOnlyByteBuffer(), RatisHelper.getRoutingTable(pipeline)) : ((DataStreamApi) Preconditions.checkNotNull(this.xceiverClient.getDataStreamApi())).stream(message.getContent().asReadOnlyByteBuffer());
    }

    public BlockID getBlockID() {
        return this.blockID.get();
    }

    public long getWrittenDataLength() {
        return this.writtenDataLength;
    }

    public List<DatanodeDetails> getFailedServers() {
        return this.failedServers;
    }

    @VisibleForTesting
    public XceiverClientRatis getXceiverClient() {
        return this.xceiverClient;
    }

    public IOException getIoException() {
        return this.ioException.get();
    }

    @Override // org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput
    public void write(ByteBuffer byteBuffer, int i, int i2) throws IOException {
        checkOpen();
        if (byteBuffer == null) {
            throw new NullPointerException();
        }
        if (i2 == 0) {
            return;
        }
        while (i2 > 0) {
            allocateNewBufferIfNeeded();
            int min = Math.min(i2, this.currentBuffer.length());
            this.currentBuffer.put(new StreamBuffer(byteBuffer, i, min));
            writeChunkIfNeeded();
            i += min;
            this.writtenDataLength += min;
            i2 -= min;
            doFlushIfNeeded();
        }
    }

    private void writeChunkIfNeeded() throws IOException {
        if (this.currentBuffer.length() == 0) {
            writeChunk(this.currentBuffer);
            this.currentBuffer = null;
        }
    }

    private void writeChunk(StreamBuffer streamBuffer) throws IOException {
        this.bufferList.add(streamBuffer);
        if (this.buffersForPutBlock == null) {
            this.buffersForPutBlock = new ArrayList();
        }
        this.buffersForPutBlock.add(streamBuffer);
        ByteBuffer duplicate = streamBuffer.duplicate();
        duplicate.position(0);
        duplicate.limit(streamBuffer.position());
        writeChunkToContainer(duplicate);
    }

    private void allocateNewBufferIfNeeded() {
        if (this.currentBuffer == null) {
            this.currentBuffer = StreamBuffer.allocate(this.config.getDataStreamMinPacketSize());
        }
    }

    private void doFlushIfNeeded() throws IOException {
        long dataStreamBufferFlushSize = this.config.getDataStreamBufferFlushSize() / this.config.getDataStreamMinPacketSize();
        long streamWindowSize = this.config.getStreamWindowSize() / this.config.getDataStreamMinPacketSize();
        if (!this.bufferList.isEmpty() && this.bufferList.size() % dataStreamBufferFlushSize == 0 && this.buffersForPutBlock != null && !this.buffersForPutBlock.isEmpty()) {
            updateFlushLength();
            executePutBlock(false, false);
        }
        if (this.bufferList.size() == streamWindowSize) {
            try {
                checkOpen();
                if (!this.putBlockFutures.isEmpty()) {
                    this.putBlockFutures.remove().get();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                handleInterruptedException(e, true);
            } catch (ExecutionException e2) {
                handleExecutionException(e2);
            }
            watchForCommit(true);
        }
    }

    private void updateFlushLength() {
        this.totalDataFlushedLength = this.writtenDataLength;
    }

    @VisibleForTesting
    public long getTotalDataFlushedLength() {
        return this.totalDataFlushedLength;
    }

    public void writeOnRetry(long j) throws IOException {
        if (j == 0) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Retrying write length {} for blockID {}", Long.valueOf(j), this.blockID);
        }
        int i = 0;
        while (j > 0) {
            StreamBuffer streamBuffer = this.bufferList.get(i);
            long min = Math.min(streamBuffer.position(), j);
            if (this.buffersForPutBlock == null) {
                this.buffersForPutBlock = new ArrayList();
            }
            this.buffersForPutBlock.add(streamBuffer);
            ByteBuffer duplicate = streamBuffer.duplicate();
            duplicate.position(0);
            duplicate.limit(streamBuffer.position());
            writeChunkToContainer(duplicate);
            j -= min;
            i++;
            this.writtenDataLength += min;
        }
    }

    private void watchForCommit(boolean z) throws IOException {
        checkOpen();
        try {
            XceiverClientReply watchOnFirstIndex = z ? this.commitWatcher.watchOnFirstIndex() : this.commitWatcher.watchOnLastIndex();
            if (watchOnFirstIndex != null) {
                List<DatanodeDetails> datanodes = watchOnFirstIndex.getDatanodes();
                if (!datanodes.isEmpty()) {
                    LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}", new Object[]{this.blockID, this.xceiverClient.getPipeline(), datanodes});
                    this.failedServers.addAll(datanodes);
                }
            }
        } catch (IOException e) {
            setIoException(e);
            throw getIoException();
        }
    }

    private void executePutBlock(boolean z, boolean z2) throws IOException {
        List<StreamBuffer> list;
        checkOpen();
        long j = this.totalDataFlushedLength;
        if (z2) {
            list = null;
        } else {
            Preconditions.checkNotNull(this.bufferList);
            list = this.buffersForPutBlock;
            this.buffersForPutBlock = null;
            Preconditions.checkNotNull(list);
        }
        waitFuturesComplete();
        ContainerProtos.BlockData build = this.containerBlockData.build();
        if (z) {
            this.dataStreamCloseReply = executePutBlockClose(ContainerProtocolCalls.getPutBlockRequest(this.xceiverClient.getPipeline(), build, true, this.token), 1048576, this.out);
            this.dataStreamCloseReply.whenComplete((dataStreamReply, th) -> {
                if (th == null && dataStreamReply != null && dataStreamReply.isSuccess()) {
                    return;
                }
                LOG.warn("Failed executePutBlockClose, reply=" + dataStreamReply, th);
                try {
                    executePutBlock(true, false);
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }
        try {
            XceiverClientReply putBlockAsync = ContainerProtocolCalls.putBlockAsync(this.xceiverClient, build, z, this.token);
            List<StreamBuffer> list2 = list;
            this.putBlockFutures.add(putBlockAsync.getResponse().thenApplyAsync(containerCommandResponseProto -> {
                try {
                    validateResponse(containerCommandResponseProto);
                    if (getIoException() == null && !z2) {
                        BlockID fromProtobuf = BlockID.getFromProtobuf(containerCommandResponseProto.getPutBlock().getCommittedBlockLength().getBlockID());
                        Preconditions.checkState(this.blockID.get().getContainerBlockID().equals(fromProtobuf.getContainerBlockID()));
                        this.blockID.set(fromProtobuf);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding index " + putBlockAsync.getLogIndex() + " commitMap size " + this.commitWatcher.getCommitIndexMap().size() + " flushLength " + j + " blockID " + this.blockID);
                        }
                        this.commitWatcher.updateCommitInfoMap(putBlockAsync.getLogIndex(), list2);
                    }
                    return containerCommandResponseProto;
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            }, (Executor) this.responseExecutor).exceptionally((Function<Throwable, ? extends U>) th2 -> {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("putBlock failed for blockID {} with exception {}", this.blockID, th2.getLocalizedMessage());
                }
                CompletionException completionException = new CompletionException(th2);
                setIoException(completionException);
                throw completionException;
            }));
        } catch (IOException | ExecutionException e) {
            throw new IOException("Unexpected Storage Container Exception: " + e.toString(), e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            handleInterruptedException(e2, false);
        }
    }

    public static CompletableFuture<DataStreamReply> executePutBlockClose(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, int i, DataStreamOutput dataStreamOutput) {
        ByteBuffer asReadOnlyByteBuffer = ContainerCommandRequestMessage.toMessage(containerCommandRequestProto, null).getContent().asReadOnlyByteBuffer();
        ByteBuffer protoLength = getProtoLength(asReadOnlyByteBuffer, i);
        RatisHelper.debug(asReadOnlyByteBuffer, "putBlock", LOG);
        dataStreamOutput.writeAsync(asReadOnlyByteBuffer, new WriteOption[0]);
        RatisHelper.debug(protoLength, "protoLength", LOG);
        return dataStreamOutput.writeAsync(protoLength, StandardWriteOption.CLOSE);
    }

    public static ByteBuffer getProtoLength(ByteBuffer byteBuffer, int i) {
        int remaining = byteBuffer.remaining();
        Preconditions.checkState(remaining <= i, "protoLength== %s > max = %s", remaining, i);
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(remaining);
        allocate.flip();
        LOG.debug("protoLength = {}", Integer.valueOf(remaining));
        Preconditions.checkState(allocate.remaining() == 4);
        return allocate.asReadOnlyBuffer();
    }

    @Override // org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput, java.io.Flushable
    public void flush() throws IOException {
        if (this.xceiverClientFactory == null || this.xceiverClient == null || this.config.isStreamBufferFlushDelay()) {
            return;
        }
        waitFuturesComplete();
    }

    public void waitFuturesComplete() throws IOException {
        try {
            CompletableFuture.allOf((CompletableFuture[]) this.futures.toArray(EMPTY_FUTURE_ARRAY)).get();
            this.futures.clear();
        } catch (Exception e) {
            LOG.warn("Failed to write all chunks through stream: " + e);
            throw new IOException(e);
        }
    }

    private void handleFlush(boolean z) throws IOException, InterruptedException, ExecutionException {
        checkOpen();
        if (this.totalDataFlushedLength < this.writtenDataLength) {
            if (this.currentBuffer != null) {
                writeChunk(this.currentBuffer);
                this.currentBuffer = null;
            }
            updateFlushLength();
            executePutBlock(z, false);
        } else if (z) {
            executePutBlock(true, true);
        }
        CompletableFuture.allOf((CompletableFuture[]) this.putBlockFutures.toArray(EMPTY_FUTURE_ARRAY)).get();
        watchForCommit(false);
        checkOpen();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.xceiverClientFactory != null) {
            try {
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                handleInterruptedException(e, true);
            } catch (ExecutionException e2) {
                handleExecutionException(e2);
            } finally {
                cleanup(false);
            }
            if (this.xceiverClient != null) {
                handleFlush(true);
                this.dataStreamCloseReply.get();
            }
        }
    }

    private void validateResponse(ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto) throws IOException {
        try {
            IOException ioException = getIoException();
            if (ioException != null) {
                throw ioException;
            }
            ContainerProtocolCalls.validateContainerResponse(containerCommandResponseProto);
        } catch (StorageContainerException e) {
            setIoException(e);
            throw e;
        }
    }

    private void setIoException(Throwable th) {
        IOException ioException = getIoException();
        if (ioException != null) {
            LOG.debug("Previous request had already failed with " + ioException.toString() + " so subsequent request also encounters Storage Container Exception ", th);
        } else {
            this.ioException.compareAndSet(null, new IOException("Unexpected Storage Container Exception: " + th.toString(), th));
        }
    }

    public void cleanup(boolean z) {
        if (this.xceiverClientFactory != null) {
            this.xceiverClientFactory.releaseClient(this.xceiverClient, z, true);
        }
        this.xceiverClientFactory = null;
        this.xceiverClient = null;
        this.commitWatcher.cleanup();
        this.responseExecutor.shutdown();
    }

    private void checkOpen() throws IOException {
        if (isClosed()) {
            throw new IOException("BlockDataStreamOutput has been closed.");
        }
        if (getIoException() != null) {
            throw getIoException();
        }
    }

    public boolean isClosed() {
        return this.xceiverClient == null;
    }

    private boolean needSync(long j) {
        return false;
    }

    private void writeChunkToContainer(ByteBuffer byteBuffer) throws IOException {
        int remaining = byteBuffer.remaining();
        long andAdd = this.chunkOffset.getAndAdd(remaining);
        ChecksumData computeChecksum = this.checksum.computeChecksum(byteBuffer.asReadOnlyBuffer());
        ContainerProtos.ChunkInfo.Builder newBuilder = ContainerProtos.ChunkInfo.newBuilder();
        StringBuilder append = new StringBuilder().append(this.blockID.get().getLocalID()).append("_chunk_");
        int i = this.chunkIndex + 1;
        this.chunkIndex = i;
        ContainerProtos.ChunkInfo build = newBuilder.setChunkName(append.append(i).toString()).setOffset(andAdd).setLen(remaining).setChecksumData(computeChecksum.getProtoBufMessage()).build();
        this.metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing chunk {} length {} at offset {}", new Object[]{build.getChunkName(), Integer.valueOf(remaining), Long.valueOf(andAdd)});
        }
        this.futures.add((needSync(andAdd + ((long) remaining)) ? this.out.writeAsync(byteBuffer, StandardWriteOption.SYNC) : this.out.writeAsync(byteBuffer, new WriteOption[0])).whenCompleteAsync((dataStreamReply, th) -> {
            if (th == null && dataStreamReply.isSuccess()) {
                if (dataStreamReply.isSuccess()) {
                    this.xceiverClient.updateCommitInfosMap(dataStreamReply.getCommitInfos());
                }
            } else {
                if (th == null) {
                    th = new IOException("result is not success");
                }
                String str = "Failed to write chunk " + build.getChunkName() + " into block " + this.blockID;
                LOG.debug("{}, exception: {}", str, th.getLocalizedMessage());
                CompletionException completionException = new CompletionException(str, th);
                setIoException(completionException);
                throw completionException;
            }
        }, (Executor) this.responseExecutor));
        this.containerBlockData.addChunks(build);
    }

    @VisibleForTesting
    public void setXceiverClient(XceiverClientRatis xceiverClientRatis) {
        this.xceiverClient = xceiverClientRatis;
    }

    private void handleInterruptedException(Exception exc, boolean z) throws IOException {
        LOG.error("Command execution was interrupted.");
        if (!z) {
            throw new IOException("Unexpected Storage Container Exception: " + exc.toString(), exc);
        }
        handleExecutionException(exc);
    }

    private void handleExecutionException(Exception exc) throws IOException {
        setIoException(exc);
        throw getIoException();
    }

    public long getTotalAckDataLength() {
        return this.commitWatcher.getTotalAckDataLength();
    }
}
