package org.apache.ratis.client.impl;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/client/impl/OrderedStreamAsync.class */
public class OrderedStreamAsync {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) OrderedStreamAsync.class);
    private final DataStreamClientRpc dataStreamClientRpc;
    private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow;
    private final Semaphore requestSemaphore;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/client/impl/OrderedStreamAsync$DataStreamWindowRequest.class */
    public static class DataStreamWindowRequest implements SlidingWindow.ClientSideRequest<DataStreamReply> {
        private final DataStreamRequestHeader header;
        private final Object data;
        private final long seqNum;
        private final CompletableFuture<DataStreamReply> replyFuture = new CompletableFuture<>();

        DataStreamWindowRequest(DataStreamRequestHeader dataStreamRequestHeader, Object obj, long j) {
            this.header = dataStreamRequestHeader;
            this.data = obj;
            this.seqNum = j;
        }

        DataStreamRequest getDataStreamRequest() {
            if (this.header.getDataLength() == 0) {
                return new DataStreamRequestByteBuffer(this.header, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
            }
            if (this.data instanceof ByteBuffer) {
                return new DataStreamRequestByteBuffer(this.header, (ByteBuffer) this.data);
            }
            if (this.data instanceof FilePositionCount) {
                return new DataStreamRequestFilePositionCount(this.header, (FilePositionCount) this.data);
            }
            throw new IllegalStateException("Unexpected " + this.data.getClass());
        }

        @Override // org.apache.ratis.util.SlidingWindow.ClientSideRequest
        public void setFirstRequest() {
        }

        @Override // org.apache.ratis.util.SlidingWindow.Request
        public long getSeqNum() {
            return this.seqNum;
        }

        @Override // org.apache.ratis.util.SlidingWindow.Request
        public void setReply(DataStreamReply dataStreamReply) {
            this.replyFuture.complete(dataStreamReply);
        }

        @Override // org.apache.ratis.util.SlidingWindow.Request
        public boolean hasReply() {
            return this.replyFuture.isDone();
        }

        @Override // org.apache.ratis.util.SlidingWindow.Request
        public void fail(Throwable th) {
            this.replyFuture.completeExceptionally(th);
        }

        public CompletableFuture<DataStreamReply> getReplyFuture() {
            return this.replyFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OrderedStreamAsync(ClientId clientId, DataStreamClientRpc dataStreamClientRpc, RaftProperties raftProperties) {
        this.dataStreamClientRpc = dataStreamClientRpc;
        this.slidingWindow = new SlidingWindow.Client<>(clientId);
        this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(raftProperties));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader dataStreamRequestHeader, Object obj) {
        try {
            this.requestSemaphore.acquire();
            return this.slidingWindow.submitNewRequest(j -> {
                return new DataStreamWindowRequest(dataStreamRequestHeader, obj, j);
            }, this::sendRequestToNetwork).getReplyFuture().whenComplete((dataStreamReply, th) -> {
                this.requestSemaphore.release();
            });
        } catch (InterruptedException e) {
            return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException("Interrupted when sending " + JavaUtils.getClassSimpleName(obj.getClass()) + ", header= " + dataStreamRequestHeader, e));
        }
    }

    private void sendRequestToNetwork(DataStreamWindowRequest dataStreamWindowRequest) {
        CompletableFuture<DataStreamReply> replyFuture = dataStreamWindowRequest.getReplyFuture();
        if (replyFuture.isDone()) {
            return;
        }
        if (this.slidingWindow.isFirst(dataStreamWindowRequest.getSeqNum())) {
            dataStreamWindowRequest.setFirstRequest();
        }
        CompletableFuture<DataStreamReply> streamAsync = this.dataStreamClientRpc.streamAsync(dataStreamWindowRequest.getDataStreamRequest());
        long seqNum = dataStreamWindowRequest.getSeqNum();
        streamAsync.thenApply(dataStreamReply -> {
            this.slidingWindow.receiveReply(seqNum, dataStreamReply, this::sendRequestToNetwork);
            return dataStreamReply;
        }).thenAccept((Consumer<? super U>) dataStreamReply2 -> {
            if (replyFuture.isDone()) {
                return;
            }
            replyFuture.complete(dataStreamReply2);
        });
    }
}
