package org.apache.hadoop.ozone.container.keyvalue.impl;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.class */
public class KeyValueStreamDataChannel extends StreamDataChannelBase {
    public static final Logger LOG = LoggerFactory.getLogger(KeyValueStreamDataChannel.class);
    private final Buffers buffers;
    private final AtomicReference<ContainerProtos.ContainerCommandRequestProto> putBlockRequest;
    private final AtomicBoolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel$Buffers.class */
    public static class Buffers {
        private final Deque<ReferenceCountedObject<ByteBuffer>> deque = new LinkedList();
        private final int max;
        private int length;

        Buffers(int i) {
            this.max = i;
        }

        private boolean isExtra(int i) {
            return this.length - i >= this.max;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasExtraBuffer() {
            return Optional.ofNullable(this.deque.peek()).map((v0) -> {
                return v0.get();
            }).filter(byteBuffer -> {
                return isExtra(byteBuffer.remaining());
            }).isPresent();
        }

        Iterable<ReferenceCountedObject<ByteBuffer>> offer(ReferenceCountedObject<ByteBuffer> referenceCountedObject) {
            ByteBuffer byteBuffer = (ByteBuffer) referenceCountedObject.retain();
            KeyValueStreamDataChannel.LOG.debug("offer {}", byteBuffer);
            Preconditions.checkState(this.deque.offer(referenceCountedObject), "Failed to offer");
            this.length += byteBuffer.remaining();
            return () -> {
                return new Iterator<ReferenceCountedObject<ByteBuffer>>() { // from class: org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers.1
                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return Buffers.this.hasExtraBuffer();
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public ReferenceCountedObject<ByteBuffer> next() {
                        ReferenceCountedObject<ByteBuffer> poll = Buffers.this.poll();
                        Buffers.this.length -= ((ByteBuffer) poll.get()).remaining();
                        Preconditions.checkState(Buffers.this.length >= Buffers.this.max);
                        return poll;
                    }
                };
            };
        }

        ReferenceCountedObject<ByteBuffer> poll() {
            ReferenceCountedObject<ByteBuffer> referenceCountedObject = (ReferenceCountedObject) Objects.requireNonNull(this.deque.poll());
            RatisHelper.debug((ByteBuffer) referenceCountedObject.get(), "polled", KeyValueStreamDataChannel.LOG);
            return referenceCountedObject;
        }

        ReferenceCountedObject<ByteBuf> pollAll() {
            Preconditions.checkState(!this.deque.isEmpty(), "The deque is empty");
            ByteBuffer[] byteBufferArr = new ByteBuffer[this.deque.size()];
            ArrayList arrayList = new ArrayList(this.deque.size());
            for (int i = 0; i < byteBufferArr.length; i++) {
                ReferenceCountedObject<ByteBuffer> poll = poll();
                arrayList.add(poll);
                byteBufferArr[i] = (ByteBuffer) poll.get();
            }
            ByteBuf asReadOnly = Unpooled.wrappedBuffer(byteBufferArr).asReadOnly();
            return ReferenceCountedObject.wrap(asReadOnly, () -> {
            }, () -> {
                asReadOnly.release();
                arrayList.forEach((v0) -> {
                    v0.release();
                });
            });
        }

        void cleanUpAll() {
            while (!this.deque.isEmpty()) {
                poll().release();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel$WriteMethod.class */
    public interface WriteMethod {
        int applyAsInt(ByteBuffer byteBuffer) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KeyValueStreamDataChannel(File file, ContainerData containerData, ContainerMetrics containerMetrics) throws StorageContainerException {
        super(file, containerData, containerMetrics);
        this.buffers = new Buffers(1048576);
        this.putBlockRequest = new AtomicReference<>();
        this.closed = new AtomicBoolean();
    }

    @Override // org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase
    ContainerProtos.Type getType() {
        return ContainerProtos.Type.StreamWrite;
    }

    public int write(ReferenceCountedObject<ByteBuffer> referenceCountedObject) throws IOException {
        getMetrics().incContainerOpsMetrics(getType());
        assertOpen();
        long monotonicNow = Time.monotonicNow();
        int writeBuffers = writeBuffers(referenceCountedObject, this.buffers, byteBuffer -> {
            return super.writeFileChannel(byteBuffer);
        });
        getMetrics().incContainerOpsLatencies(getType(), Time.monotonicNow() - monotonicNow);
        return writeBuffers;
    }

    static int writeBuffers(ReferenceCountedObject<ByteBuffer> referenceCountedObject, Buffers buffers, WriteMethod writeMethod) throws IOException {
        for (ReferenceCountedObject<ByteBuffer> referenceCountedObject2 : buffers.offer(referenceCountedObject)) {
            try {
                writeFully((ByteBuffer) referenceCountedObject2.get(), writeMethod);
                referenceCountedObject2.release();
            } catch (Throwable th) {
                referenceCountedObject2.release();
                throw th;
            }
        }
        return ((ByteBuffer) referenceCountedObject.get()).remaining();
    }

    private static void writeFully(ByteBuffer byteBuffer, WriteMethod writeMethod) throws IOException {
        while (byteBuffer.remaining() > 0) {
            if (writeMethod.applyAsInt(byteBuffer) <= 0) {
                throw new IOException("Unable to write");
            }
        }
    }

    public ContainerProtos.ContainerCommandRequestProto getPutBlockRequest() {
        return (ContainerProtos.ContainerCommandRequestProto) Objects.requireNonNull(this.putBlockRequest.get(), (Supplier<String>) () -> {
            return "putBlockRequest == null, " + this;
        });
    }

    void assertOpen() throws IOException {
        if (this.closed.get()) {
            throw new IOException("Already closed: " + this);
        }
    }

    @Override // org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase
    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            try {
                this.putBlockRequest.set(closeBuffers(this.buffers, byteBuffer -> {
                    return super.writeFileChannel(byteBuffer);
                }));
            } finally {
                super.close();
            }
        }
    }

    @Override // org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase
    protected void cleanupInternal() throws IOException {
        this.buffers.cleanUpAll();
        if (this.closed.compareAndSet(false, true)) {
            super.close();
        }
    }

    static ContainerProtos.ContainerCommandRequestProto closeBuffers(Buffers buffers, WriteMethod writeMethod) throws IOException {
        ReferenceCountedObject<ByteBuf> pollAll = buffers.pollAll();
        ByteBuf byteBuf = (ByteBuf) pollAll.retain();
        try {
            ContainerProtos.ContainerCommandRequestProto readPutBlockRequest = readPutBlockRequest(byteBuf);
            writeFully(byteBuf.nioBuffer(), writeMethod);
            pollAll.release();
            return readPutBlockRequest;
        } catch (Throwable th) {
            pollAll.release();
            throw th;
        }
    }

    private static int readProtoLength(ByteBuf byteBuf, int i) {
        int readerIndex = byteBuf.readerIndex();
        LOG.debug("{}, lengthIndex = {}, readerIndex = {}", new Object[]{byteBuf, Integer.valueOf(i), Integer.valueOf(readerIndex)});
        if (i > readerIndex) {
            byteBuf.readerIndex(i);
        } else {
            Preconditions.checkState(i == readerIndex);
        }
        RatisHelper.debug(byteBuf, "readProtoLength", LOG);
        return byteBuf.nioBuffer().getInt();
    }

    static ContainerProtos.ContainerCommandRequestProto readPutBlockRequest(ByteBuf byteBuf) throws IOException {
        int readerIndex = byteBuf.readerIndex();
        int readableBytes = (readerIndex + byteBuf.readableBytes()) - 4;
        int readProtoLength = readProtoLength(byteBuf.duplicate(), readableBytes);
        int i = readableBytes - readProtoLength;
        try {
            ContainerProtos.ContainerCommandRequestProto readPutBlockRequest = readPutBlockRequest(byteBuf.slice(i, readProtoLength).nioBuffer());
            byteBuf.writerIndex(i);
            return readPutBlockRequest;
        } catch (Throwable th) {
            RatisHelper.debug(byteBuf, "catch", LOG);
            throw new IOException("Failed to readPutBlockRequest from " + byteBuf + ": readerIndex=" + readerIndex + ", protoIndex=" + i + ", protoLength=" + readProtoLength + ", lengthIndex=" + readableBytes, th);
        }
    }

    private static ContainerProtos.ContainerCommandRequestProto readPutBlockRequest(ByteBuffer byteBuffer) throws IOException {
        RatisHelper.debug(byteBuffer, "readPutBlockRequest", LOG);
        ContainerProtos.ContainerCommandRequestProto proto = ContainerCommandRequestMessage.toProto(ByteString.copyFrom(byteBuffer), (RaftGroupId) null);
        if (proto.hasPutBlock()) {
            return proto;
        }
        throw new StorageContainerException("Malformed PutBlock request. trace ID: " + proto.getTraceID(), ContainerProtos.Result.MALFORMED_REQUEST);
    }

    @Override // org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase
    public /* bridge */ /* synthetic */ String toString() {
        return super.toString();
    }

    @Override // org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase
    public /* bridge */ /* synthetic */ ContainerMetrics getMetrics() {
        return super.getMetrics();
    }

    @Override // org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase
    public /* bridge */ /* synthetic */ boolean cleanUp() {
        return super.cleanUp();
    }

    @Override // org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase
    public /* bridge */ /* synthetic */ void setLinked() {
        super.setLinked();
    }
}
