package org.apache.ratis.client.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.OrderedStreamAsync;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SlidingWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/client/impl/DataStreamClientImpl.class */
public class DataStreamClientImpl implements DataStreamClient {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) DataStreamClientImpl.class);
    private final RaftClient client;
    private final ClientId clientId;
    private final RaftGroupId groupId;
    private final RaftPeer dataStreamServer;
    private final DataStreamClientRpc dataStreamClientRpc;
    private final OrderedStreamAsync orderedStreamAsync;
    private final boolean skipSendForward;

    /* loaded from: input_file:org/apache/ratis/client/impl/DataStreamClientImpl$DataStreamOutputImpl.class */
    public final class DataStreamOutputImpl implements DataStreamOutputRpc {
        private final RaftClientRequest header;
        private final CompletableFuture<DataStreamReply> headerFuture;
        private final SlidingWindow.Client<OrderedStreamAsync.DataStreamWindowRequest, DataStreamReply> slidingWindow;
        private final CompletableFuture<RaftClientReply> raftClientReplyFuture;
        private CompletableFuture<DataStreamReply> closeFuture;
        private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier;
        private long streamOffset;

        private DataStreamOutputImpl(RaftClientRequest raftClientRequest) {
            this.raftClientReplyFuture = new CompletableFuture<>();
            this.writableByteChannelSupplier = JavaUtils.memoize(() -> {
                return new WritableByteChannel() { // from class: org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl.1
                    @Override // java.nio.channels.WritableByteChannel
                    public int write(ByteBuffer byteBuffer) throws IOException {
                        int remaining = byteBuffer.remaining();
                        return Math.toIntExact(((DataStreamReply) IOUtils.getFromFuture(DataStreamOutputImpl.this.writeAsync(byteBuffer, StandardWriteOption.FLUSH), () -> {
                            return "write(" + remaining + " bytes for " + ClientInvocationId.valueOf(DataStreamOutputImpl.this.header) + ")";
                        })).getBytesWritten());
                    }

                    @Override // java.nio.channels.Channel
                    public boolean isOpen() {
                        return !DataStreamOutputImpl.this.isClosed();
                    }

                    @Override // java.nio.channels.Channel, java.io.Closeable, java.lang.AutoCloseable
                    public void close() throws IOException {
                        if (DataStreamOutputImpl.this.isClosed()) {
                            return;
                        }
                        IOUtils.getFromFuture(DataStreamOutputImpl.this.writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE), () -> {
                            return "close(" + ClientInvocationId.valueOf(DataStreamOutputImpl.this.header) + ")";
                        });
                    }
                };
            });
            this.streamOffset = 0L;
            this.header = raftClientRequest;
            this.slidingWindow = new SlidingWindow.Client<>(ClientInvocationId.valueOf(DataStreamClientImpl.this.clientId, this.header.getCallId()));
            this.headerFuture = send(RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER, ClientProtoUtils.toRaftClientRequestProtoByteBuffer(this.header), r0.remaining(), Collections.singleton(StandardWriteOption.FLUSH));
        }

        private CompletableFuture<DataStreamReply> send(RaftProtos.DataStreamPacketHeaderProto.Type type, Object obj, long j, Iterable<WriteOption> iterable) {
            return DataStreamClientImpl.this.orderedStreamAsync.sendRequest(new DataStreamRequestHeader(this.header.getClientId(), type, this.header.getCallId(), this.streamOffset, j, iterable), obj, this.slidingWindow);
        }

        private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> completableFuture) {
            return completableFuture.thenCombine((CompletionStage) this.headerFuture, (dataStreamReply, dataStreamReply2) -> {
                return dataStreamReply2.isSuccess() ? dataStreamReply : dataStreamReply2;
            });
        }

        private CompletableFuture<DataStreamReply> writeAsyncImpl(Object obj, long j, Iterable<WriteOption> iterable) {
            if (isClosed()) {
                return JavaUtils.completeExceptionally(new AlreadyClosedException(DataStreamClientImpl.this.clientId + ": stream already closed, request=" + this.header));
            }
            CompletableFuture<DataStreamReply> combineHeader = combineHeader(send(RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA, obj, j, iterable));
            if (WriteOption.containsOption(iterable, StandardWriteOption.CLOSE)) {
                if (DataStreamClientImpl.this.skipSendForward) {
                    this.closeFuture = combineHeader;
                } else {
                    this.closeFuture = DataStreamClientImpl.this.client != null ? combineHeader.thenCompose(this::sendForward) : combineHeader;
                }
                this.closeFuture.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete((BiConsumer<? super U, ? super Throwable>) JavaUtils.asBiConsumer(this.raftClientReplyFuture));
            }
            this.streamOffset += j;
            return combineHeader;
        }

        @Override // org.apache.ratis.client.api.DataStreamOutput
        public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer byteBuffer, Iterable<WriteOption> iterable) {
            return writeAsyncImpl(byteBuffer, byteBuffer.remaining(), iterable);
        }

        @Override // org.apache.ratis.client.api.DataStreamOutput
        public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount filePositionCount, WriteOption... writeOptionArr) {
            return writeAsyncImpl(filePositionCount, filePositionCount.getCount(), Arrays.asList(writeOptionArr));
        }

        boolean isClosed() {
            return this.closeFuture != null;
        }

        @Override // org.apache.ratis.io.CloseAsync
        public CompletableFuture<DataStreamReply> closeAsync() {
            if (!isClosed()) {
                writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE);
            }
            return (CompletableFuture) Objects.requireNonNull(this.closeFuture, "closeFuture == null");
        }

        public RaftClientRequest getHeader() {
            return this.header;
        }

        @Override // org.apache.ratis.client.DataStreamOutputRpc
        public CompletableFuture<DataStreamReply> getHeaderFuture() {
            return this.headerFuture;
        }

        @Override // org.apache.ratis.client.api.DataStreamOutput
        public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
            return this.raftClientReplyFuture;
        }

        @Override // org.apache.ratis.client.api.DataStreamOutput
        public WritableByteChannel getWritableByteChannel() {
            return this.writableByteChannelSupplier.get();
        }

        private CompletableFuture<DataStreamReply> sendForward(DataStreamReply dataStreamReply) {
            DataStreamClientImpl.LOG.debug("sendForward {}", dataStreamReply);
            return !dataStreamReply.isSuccess() ? CompletableFuture.completedFuture(dataStreamReply) : ((AsyncRpcApi) DataStreamClientImpl.this.client.async()).sendForward(this.header).thenApply(raftClientReply -> {
                return DataStreamReplyByteBuffer.newBuilder().setClientId(DataStreamClientImpl.this.clientId).setType(dataStreamReply.getType()).setStreamId(dataStreamReply.getStreamId()).setStreamOffset(dataStreamReply.getStreamOffset()).setBuffer(ClientProtoUtils.toRaftClientReplyProto(raftClientReply).toByteString().asReadOnlyByteBuffer()).setSuccess(raftClientReply.isSuccess()).setBytesWritten(dataStreamReply.getBytesWritten()).setCommitInfos(raftClientReply.getCommitInfos()).build();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStreamClientImpl(ClientId clientId, RaftGroupId raftGroupId, RaftPeer raftPeer, DataStreamClientRpc dataStreamClientRpc, RaftProperties raftProperties) {
        Logger logger = LOG;
        logger.getClass();
        this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(raftProperties, logger::info);
        this.client = null;
        this.clientId = clientId;
        this.groupId = raftGroupId;
        this.dataStreamServer = raftPeer;
        this.dataStreamClientRpc = dataStreamClientRpc;
        this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, raftProperties);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStreamClientImpl(RaftClient raftClient, RaftPeer raftPeer, DataStreamClientRpc dataStreamClientRpc, RaftProperties raftProperties) {
        Logger logger = LOG;
        logger.getClass();
        this.skipSendForward = RaftConfigKeys.DataStream.skipSendForward(raftProperties, logger::info);
        this.client = raftClient;
        this.clientId = raftClient.getId();
        this.groupId = raftClient.getGroupId();
        this.dataStreamServer = raftPeer;
        this.dataStreamClientRpc = dataStreamClientRpc;
        this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, raftProperties);
    }

    @Override // org.apache.ratis.client.DataStreamClient
    public DataStreamClientRpc getClientRpc() {
        return this.dataStreamClientRpc;
    }

    @Override // org.apache.ratis.client.DataStreamRpcApi
    public DataStreamOutputRpc stream(RaftClientRequest raftClientRequest) {
        return new DataStreamOutputImpl(raftClientRequest);
    }

    @Override // org.apache.ratis.client.api.DataStreamApi
    public DataStreamOutputRpc stream(ByteBuffer byteBuffer) {
        return stream(byteBuffer, (RoutingTable) null);
    }

    @Override // org.apache.ratis.client.api.DataStreamApi
    public DataStreamOutputRpc stream(ByteBuffer byteBuffer, RoutingTable routingTable) {
        if (routingTable != null) {
            Preconditions.assertTrue(this.dataStreamServer.getId().equals(routingTable.getPrimary()), (Supplier<Object>) () -> {
                return "Primary peer mismatched: the routing table has " + routingTable.getPrimary() + " but the client has " + this.dataStreamServer.getId();
            });
        }
        return new DataStreamOutputImpl(RaftClientRequest.newBuilder().setClientId(this.clientId).setServerId(this.dataStreamServer.getId()).setGroupId(this.groupId).setCallId(CallId.getAndIncrement()).setMessage((Message) Optional.ofNullable(byteBuffer).map(ByteString::copyFrom).map(Message::valueOf).orElse(null)).setType(RaftClientRequest.dataStreamRequestType()).setRoutingTable(routingTable).build());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.dataStreamClientRpc.close();
    }
}
