package org.apache.hadoop.ozone.container.keyvalue.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.ReferenceCountedObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.class */
public class TestKeyValueStreamDataChannel {
    public static final Logger LOG = LoggerFactory.getLogger(TestKeyValueStreamDataChannel.class);
    static final ContainerProtos.ContainerCommandRequestProto PUT_BLOCK_PROTO = ContainerProtos.ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos.Type.PutBlock).setPutBlock(ContainerProtos.PutBlockRequestProto.newBuilder().setBlockData(ContainerProtos.BlockData.newBuilder().setBlockID(ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(222).setLocalID(333).build()).build())).setDatanodeUuid("datanodeId").setContainerID(111).build();
    static final int PUT_BLOCK_PROTO_SIZE = PUT_BLOCK_PROTO.toByteString().size();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel$Output.class */
    public static class Output implements DataStreamOutput {
        private final KeyValueStreamDataChannel.Buffers buffers;
        private final ByteBuf outBuf = Unpooled.buffer();
        private final KeyValueStreamDataChannel.WriteMethod writeMethod = byteBuffer -> {
            int remaining = byteBuffer.remaining();
            this.outBuf.writeBytes(byteBuffer);
            return remaining;
        };

        Output(KeyValueStreamDataChannel.Buffers buffers) {
            this.buffers = buffers;
        }

        ByteBuf getOutBuf() {
            return this.outBuf;
        }

        public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer byteBuffer, Iterable<WriteOption> iterable) {
            try {
                return WriteOption.containsOption(iterable, StandardWriteOption.CLOSE) ? closeAsync() : CompletableFuture.completedFuture(new Reply(true, KeyValueStreamDataChannel.writeBuffers(ReferenceCountedObject.wrap(byteBuffer, () -> {
                }, () -> {
                }), this.buffers, this.writeMethod)));
            } catch (IOException e) {
                return TestKeyValueStreamDataChannel.completeExceptionally(e);
            }
        }

        public CompletableFuture<DataStreamReply> closeAsync() {
            try {
                return CompletableFuture.completedFuture(new Reply(true, 0L, KeyValueStreamDataChannel.closeBuffers(this.buffers, this.writeMethod)));
            } catch (IOException e) {
                return TestKeyValueStreamDataChannel.completeExceptionally(e);
            }
        }

        public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount filePositionCount, WriteOption... writeOptionArr) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
            throw new UnsupportedOperationException();
        }

        public WritableByteChannel getWritableByteChannel() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel$Reply.class */
    public static class Reply implements DataStreamReply {
        private final boolean success;
        private final long bytesWritten;
        private final ContainerProtos.ContainerCommandRequestProto putBlockRequest;

        Reply(boolean z, long j) {
            this(z, j, null);
        }

        Reply(boolean z, long j, ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) {
            this.success = z;
            this.bytesWritten = j;
            this.putBlockRequest = containerCommandRequestProto;
        }

        ContainerProtos.ContainerCommandRequestProto getPutBlockRequest() {
            return this.putBlockRequest;
        }

        public boolean isSuccess() {
            return this.success;
        }

        public long getBytesWritten() {
            return this.bytesWritten;
        }

        public Collection<RaftProtos.CommitInfoProto> getCommitInfos() {
            throw new UnsupportedOperationException();
        }

        public ClientId getClientId() {
            throw new UnsupportedOperationException();
        }

        public RaftProtos.DataStreamPacketHeaderProto.Type getType() {
            throw new UnsupportedOperationException();
        }

        public long getStreamId() {
            throw new UnsupportedOperationException();
        }

        public long getStreamOffset() {
            throw new UnsupportedOperationException();
        }

        public long getDataLength() {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void testSerialization() throws Exception {
        ByteBuffer asReadOnlyByteBuffer = ContainerCommandRequestMessage.toMessage(PUT_BLOCK_PROTO, (String) null).getContent().asReadOnlyByteBuffer();
        ByteBuffer protoLength = BlockDataStreamOutput.getProtoLength(asReadOnlyByteBuffer, 1048576);
        byte[] bArr = new byte[ThreadLocalRandom.current().nextInt(1000) + 100];
        ByteBuf buffer = Unpooled.buffer(1048576);
        buffer.writeBytes(bArr);
        buffer.writeBytes(asReadOnlyByteBuffer);
        buffer.writeBytes(protoLength);
        Assertions.assertEquals(PUT_BLOCK_PROTO, KeyValueStreamDataChannel.readPutBlockRequest(buffer));
    }

    @Test
    public void testBuffers() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(32);
        ArrayList arrayList = new ArrayList();
        int i = PUT_BLOCK_PROTO_SIZE + 4;
        int[] iArr = {i, 2 * i, 10 * i};
        int[] iArr2 = {0, 10, 100, 10000};
        for (int i2 : iArr) {
            for (int i3 : iArr2) {
                arrayList.add(CompletableFuture.supplyAsync(() -> {
                    return runTestBuffers(i3, i2);
                }, newFixedThreadPool));
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((CompletableFuture) it.next()).get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String runTestBuffers(int i, int i2) {
        int nextInt = ThreadLocalRandom.current().nextInt();
        String format = String.format("[dataSize=%d,max=%d,seed=%H]", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(nextInt));
        LOG.info(format);
        try {
            runTestBuffers(i, i2, nextInt, format);
            return format;
        } catch (Throwable th) {
            throw new CompletionException("Failed " + format, th);
        }
    }

    static void runTestBuffers(int i, int i2, int i3, String str) throws Exception {
        Assertions.assertTrue(i2 >= PUT_BLOCK_PROTO_SIZE);
        byte[] bArr = new byte[i];
        Random random = new Random(i3);
        random.nextBytes(bArr);
        Output output = new Output(new KeyValueStreamDataChannel.Buffers(i2));
        int i4 = 0;
        while (true) {
            int i5 = i4;
            if (i5 >= i) {
                break;
            }
            int min = Math.min(random.nextInt(4 * i2), i - i5);
            LOG.info("{}: offset = {}, length = {}", new Object[]{str, Integer.valueOf(i5), Integer.valueOf(min)});
            assertReply((DataStreamReply) output.writeAsync(ByteBuffer.wrap(bArr, i5, min), new WriteOption[0]).get(), min, null);
            i4 = i5 + min;
        }
        assertReply((DataStreamReply) BlockDataStreamOutput.executePutBlockClose(PUT_BLOCK_PROTO, i2, output).get(), 0, PUT_BLOCK_PROTO);
        ByteBuf outBuf = output.getOutBuf();
        LOG.info("outBuf = {}", outBuf);
        Assertions.assertEquals(i, outBuf.readableBytes());
        for (int i6 = 0; i6 < i; i6++) {
            Assertions.assertEquals(bArr[i6], outBuf.readByte());
        }
        outBuf.release();
    }

    static void assertReply(DataStreamReply dataStreamReply, int i, ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) {
        Assertions.assertTrue(dataStreamReply.isSuccess());
        Assertions.assertEquals(i, dataStreamReply.getBytesWritten());
        Assertions.assertEquals(containerCommandRequestProto, ((Reply) dataStreamReply).getPutBlockRequest());
    }

    static CompletableFuture<DataStreamReply> completeExceptionally(Throwable th) {
        CompletableFuture<DataStreamReply> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(th);
        return completableFuture;
    }

    static {
        LOG.info("PUT_BLOCK_PROTO_SIZE = {}", Integer.valueOf(PUT_BLOCK_PROTO_SIZE));
    }
}
