package org.apache.ratis.netty.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/netty/server/DataStreamManagement.class */
public class DataStreamManagement {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) DataStreamManagement.class);
    private final RaftServer server;
    private final String name;
    private final StreamMap streams = new StreamMap();
    private final Executor requestExecutor;
    private final Executor writeExecutor;
    private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/netty/server/DataStreamManagement$LocalStream.class */
    public static class LocalStream {
        private final CompletableFuture<StateMachine.DataStream> streamFuture;
        private final AtomicReference<CompletableFuture<Long>> writeFuture;
        private final NettyServerStreamRpcMetrics.RequestMetrics metrics;

        LocalStream(CompletableFuture<StateMachine.DataStream> completableFuture, NettyServerStreamRpcMetrics.RequestMetrics requestMetrics) {
            this.streamFuture = completableFuture;
            this.writeFuture = new AtomicReference<>(completableFuture.thenApply(dataStream -> {
                return 0L;
            }));
            this.metrics = requestMetrics;
        }

        CompletableFuture<Long> write(ByteBuf byteBuf, Iterable<WriteOption> iterable, Executor executor) {
            NettyServerStreamRpcMetrics.RequestContext start = this.metrics.start();
            return DataStreamManagement.composeAsync(this.writeFuture, executor, l -> {
                return this.streamFuture.thenCompose(dataStream -> {
                    return DataStreamManagement.writeToAsync(byteBuf, iterable, dataStream, executor).whenComplete((l, th) -> {
                        this.metrics.stop(start, th == null);
                    });
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/netty/server/DataStreamManagement$RemoteStream.class */
    public static class RemoteStream {
        private final DataStreamOutputRpc out;
        private final AtomicReference<CompletableFuture<DataStreamReply>> sendFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));
        private final NettyServerStreamRpcMetrics.RequestMetrics metrics;

        RemoteStream(DataStreamOutputRpc dataStreamOutputRpc, NettyServerStreamRpcMetrics.RequestMetrics requestMetrics) {
            this.metrics = requestMetrics;
            this.out = dataStreamOutputRpc;
        }

        CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf dataStreamRequestByteBuf, Executor executor) {
            NettyServerStreamRpcMetrics.RequestContext start = this.metrics.start();
            return DataStreamManagement.composeAsync(this.sendFuture, executor, dataStreamReply -> {
                return this.out.writeAsync(dataStreamRequestByteBuf.slice().nioBuffer(), dataStreamRequestByteBuf.getWriteOptionList()).whenComplete((dataStreamReply, th) -> {
                    this.metrics.stop(start, th == null);
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/netty/server/DataStreamManagement$StreamInfo.class */
    public static class StreamInfo {
        private final RaftClientRequest request;
        private final boolean primary;
        private final LocalStream local;
        private final Set<RemoteStream> remotes;
        private final RaftServer server;
        private final AtomicReference<CompletableFuture<Void>> previous = new AtomicReference<>(CompletableFuture.completedFuture(null));

        StreamInfo(RaftClientRequest raftClientRequest, boolean z, CompletableFuture<StateMachine.DataStream> completableFuture, RaftServer raftServer, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> checkedBiFunction, Function<NettyServerStreamRpcMetrics.RequestType, NettyServerStreamRpcMetrics.RequestMetrics> function) throws IOException {
            this.request = raftClientRequest;
            this.primary = z;
            this.local = new LocalStream(completableFuture, function.apply(NettyServerStreamRpcMetrics.RequestType.LOCAL_WRITE));
            this.server = raftServer;
            this.remotes = (Set) checkedBiFunction.apply(raftClientRequest, getSuccessors(raftServer.getId())).stream().map(dataStreamOutputRpc -> {
                return new RemoteStream(dataStreamOutputRpc, (NettyServerStreamRpcMetrics.RequestMetrics) function.apply(NettyServerStreamRpcMetrics.RequestType.REMOTE_WRITE));
            }).collect(Collectors.toSet());
        }

        AtomicReference<CompletableFuture<Void>> getPrevious() {
            return this.previous;
        }

        RaftClientRequest getRequest() {
            return this.request;
        }

        RaftServer.Division getDivision() throws IOException {
            return this.server.getDivision(this.request.getRaftGroupId());
        }

        Collection<RaftProtos.CommitInfoProto> getCommitInfos() {
            try {
                return getDivision().getCommitInfos();
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }

        boolean isPrimary() {
            return this.primary;
        }

        LocalStream getLocal() {
            return this.local;
        }

        <T> List<T> applyToRemotes(Function<RemoteStream, T> function) {
            return this.remotes.isEmpty() ? Collections.emptyList() : (List) this.remotes.stream().map(function).collect(Collectors.toList());
        }

        public String toString() {
            return JavaUtils.getClassSimpleName(getClass()) + ":" + this.request;
        }

        private Set<RaftPeer> getSuccessors(RaftPeerId raftPeerId) throws IOException {
            RaftConfiguration raftConf = getDivision().getRaftConf();
            RoutingTable routingTable = this.request.getRoutingTable();
            if (routingTable == null) {
                return isPrimary() ? (Set) raftConf.getCurrentPeers().stream().filter(raftPeer -> {
                    return !raftPeer.getId().equals(this.server.getId());
                }).collect(Collectors.toSet()) : Collections.emptySet();
            }
            Stream<RaftPeerId> stream = routingTable.getSuccessors(raftPeerId).stream();
            raftConf.getClass();
            return (Set) stream.map(raftPeerId2 -> {
                return raftConf.getPeer(raftPeerId2, new RaftProtos.RaftPeerRole[0]);
            }).collect(Collectors.toSet());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/netty/server/DataStreamManagement$StreamMap.class */
    public static class StreamMap {
        private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new ConcurrentHashMap();

        StreamMap() {
        }

        StreamInfo computeIfAbsent(ClientInvocationId clientInvocationId, Function<ClientInvocationId, StreamInfo> function) {
            StreamInfo computeIfAbsent = this.map.computeIfAbsent(clientInvocationId, function);
            DataStreamManagement.LOG.debug("computeIfAbsent({}) returns {}", clientInvocationId, computeIfAbsent);
            return computeIfAbsent;
        }

        StreamInfo get(ClientInvocationId clientInvocationId) {
            StreamInfo streamInfo = this.map.get(clientInvocationId);
            DataStreamManagement.LOG.debug("get({}) returns {}", clientInvocationId, streamInfo);
            return streamInfo;
        }

        StreamInfo remove(ClientInvocationId clientInvocationId) {
            StreamInfo remove = this.map.remove(clientInvocationId);
            DataStreamManagement.LOG.debug("remove({}) returns {}", clientInvocationId, remove);
            return remove;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStreamManagement(RaftServer raftServer, NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics) {
        this.server = raftServer;
        this.name = raftServer.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
        RaftProperties properties = raftServer.getProperties();
        boolean asyncRequestThreadPoolCached = RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
        this.requestExecutor = ConcurrentUtils.newThreadPoolWithMax(asyncRequestThreadPoolCached, RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties), this.name + "-request-");
        this.writeExecutor = ConcurrentUtils.newThreadPoolWithMax(asyncRequestThreadPoolCached, RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties), this.name + "-write-");
        this.nettyServerStreamRpcMetrics = nettyServerStreamRpcMetrics;
    }

    private CompletableFuture<StateMachine.DataStream> computeDataStreamIfAbsent(RaftClientRequest raftClientRequest) throws IOException {
        RaftServer.Division division = this.server.getDivision(raftClientRequest.getRaftGroupId());
        ClientInvocationId valueOf = ClientInvocationId.valueOf(raftClientRequest);
        MemoizedSupplier memoize = JavaUtils.memoize(() -> {
            NettyServerStreamRpcMetrics.RequestMetrics newRequestMetrics = getMetrics().newRequestMetrics(NettyServerStreamRpcMetrics.RequestType.STATE_MACHINE_STREAM);
            NettyServerStreamRpcMetrics.RequestContext start = newRequestMetrics.start();
            return division.getStateMachine().data().stream(raftClientRequest).whenComplete((dataStream, th) -> {
                newRequestMetrics.stop(start, th == null);
            });
        });
        CompletableFuture<StateMachine.DataStream> computeIfAbsent = division.getDataStreamMap().computeIfAbsent(valueOf, clientInvocationId -> {
            return (CompletableFuture) memoize.get();
        });
        if (memoize.isInitialized()) {
            return computeIfAbsent;
        }
        throw new AlreadyExistsException("A DataStream already exists for " + valueOf);
    }

    private StreamInfo newStreamInfo(ByteBuf byteBuf, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> checkedBiFunction) {
        try {
            RaftClientRequest raftClientRequest = ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(byteBuf.nioBuffer()));
            boolean equals = this.server.getId().equals(raftClientRequest.getServerId());
            CompletableFuture<StateMachine.DataStream> computeDataStreamIfAbsent = computeDataStreamIfAbsent(raftClientRequest);
            RaftServer raftServer = this.server;
            NettyServerStreamRpcMetrics metrics = getMetrics();
            metrics.getClass();
            return new StreamInfo(raftClientRequest, equals, computeDataStreamIfAbsent, raftServer, checkedBiFunction, metrics::newRequestMetrics);
        } catch (Throwable th) {
            throw new CompletionException(th);
        }
    }

    static <T> CompletableFuture<T> composeAsync(AtomicReference<CompletableFuture<T>> atomicReference, Executor executor, Function<T, CompletableFuture<T>> function) {
        return atomicReference.updateAndGet(completableFuture -> {
            return completableFuture.thenComposeAsync(function, executor);
        });
    }

    static CompletableFuture<Long> writeToAsync(ByteBuf byteBuf, Iterable<WriteOption> iterable, StateMachine.DataStream dataStream, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            return Long.valueOf(writeTo(byteBuf, iterable, dataStream));
        }, (Executor) Optional.ofNullable(dataStream.getExecutor()).orElse(executor));
    }

    static long writeTo(ByteBuf byteBuf, Iterable<WriteOption> iterable, StateMachine.DataStream dataStream) {
        StateMachine.DataChannel dataChannel = dataStream.getDataChannel();
        long j = 0;
        for (ByteBuffer byteBuffer : byteBuf.nioBuffers()) {
            byteBuf.getClass();
            Runnable runnable = byteBuf::retain;
            byteBuf.getClass();
            try {
                j += dataChannel.write(ReferenceCountedObject.wrap(byteBuffer, runnable, byteBuf::release));
            } finally {
                CompletionException completionException = new CompletionException(th);
            }
        }
        if (WriteOption.containsOption(iterable, StandardWriteOption.SYNC)) {
            try {
                dataChannel.force(false);
            } catch (IOException th) {
                throw new CompletionException(th);
            }
        }
        if (WriteOption.containsOption(iterable, StandardWriteOption.CLOSE)) {
            close(dataStream);
        }
        return j;
    }

    static void close(StateMachine.DataStream dataStream) {
        try {
            dataStream.getDataChannel().close();
        } catch (IOException e) {
            throw new CompletionException("Failed to close " + dataStream, e);
        }
    }

    static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(DataStreamRequestByteBuf dataStreamRequestByteBuf, RaftClientReply raftClientReply, long j, Collection<RaftProtos.CommitInfoProto> collection) {
        return DataStreamReplyByteBuffer.newBuilder().setDataStreamPacket(dataStreamRequestByteBuf).setBuffer(ClientProtoUtils.toRaftClientReplyProto(raftClientReply).toByteString().asReadOnlyByteBuffer()).setSuccess(raftClientReply.isSuccess()).setBytesWritten(j).setCommitInfos(collection).build();
    }

    static void sendReply(List<CompletableFuture<DataStreamReply>> list, DataStreamRequestByteBuf dataStreamRequestByteBuf, long j, Collection<RaftProtos.CommitInfoProto> collection, ChannelHandlerContext channelHandlerContext) {
        boolean checkSuccessRemoteWrite = checkSuccessRemoteWrite(list, j, dataStreamRequestByteBuf);
        DataStreamReplyByteBuffer.Builder commitInfos = DataStreamReplyByteBuffer.newBuilder().setDataStreamPacket(dataStreamRequestByteBuf).setSuccess(checkSuccessRemoteWrite).setCommitInfos(collection);
        if (checkSuccessRemoteWrite) {
            commitInfos.setBytesWritten(j);
        }
        channelHandlerContext.writeAndFlush(commitInfos.build());
    }

    private CompletableFuture<RaftClientReply> startTransaction(StreamInfo streamInfo, DataStreamRequestByteBuf dataStreamRequestByteBuf, long j, ChannelHandlerContext channelHandlerContext) {
        NettyServerStreamRpcMetrics.RequestMetrics newRequestMetrics = getMetrics().newRequestMetrics(NettyServerStreamRpcMetrics.RequestType.START_TRANSACTION);
        NettyServerStreamRpcMetrics.RequestContext start = newRequestMetrics.start();
        try {
            return ((AsyncRpcApi) this.server.getDivision(streamInfo.getRequest().getRaftGroupId()).getRaftClient().async()).sendForward(streamInfo.request).whenCompleteAsync((raftClientReply, th) -> {
                newRequestMetrics.stop(start, th == null);
                if (th != null) {
                    replyDataStreamException(this.server, th, streamInfo.getRequest(), dataStreamRequestByteBuf, channelHandlerContext);
                } else {
                    channelHandlerContext.writeAndFlush(newDataStreamReplyByteBuffer(dataStreamRequestByteBuf, raftClientReply, j, streamInfo.getCommitInfos()));
                }
            }, this.requestExecutor);
        } catch (IOException e) {
            throw new CompletionException(e);
        }
    }

    static void replyDataStreamException(RaftServer raftServer, Throwable th, RaftClientRequest raftClientRequest, DataStreamRequestByteBuf dataStreamRequestByteBuf, ChannelHandlerContext channelHandlerContext) {
        sendDataStreamException(th, dataStreamRequestByteBuf, RaftClientReply.newBuilder().setRequest(raftClientRequest).setException(new DataStreamException(raftServer.getId(), th)).build(), channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void replyDataStreamException(Throwable th, DataStreamRequestByteBuf dataStreamRequestByteBuf, ChannelHandlerContext channelHandlerContext) {
        sendDataStreamException(th, dataStreamRequestByteBuf, RaftClientReply.newBuilder().setClientId(ClientId.emptyClientId()).setServerId(this.server.getId()).setGroupId(RaftGroupId.emptyGroupId()).setException(new DataStreamException(this.server.getId(), th)).build(), channelHandlerContext);
    }

    static void sendDataStreamException(Throwable th, DataStreamRequestByteBuf dataStreamRequestByteBuf, RaftClientReply raftClientReply, ChannelHandlerContext channelHandlerContext) {
        LOG.warn("Failed to process {}", dataStreamRequestByteBuf, th);
        try {
            channelHandlerContext.writeAndFlush(newDataStreamReplyByteBuffer(dataStreamRequestByteBuf, raftClientReply, 0L, null));
        } catch (Throwable th2) {
            LOG.warn("Failed to sendDataStreamException {} for {}", th, dataStreamRequestByteBuf, th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void read(DataStreamRequestByteBuf dataStreamRequestByteBuf, ChannelHandlerContext channelHandlerContext, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> checkedBiFunction) {
        LOG.debug("{}: read {}", this, dataStreamRequestByteBuf);
        ByteBuf slice = dataStreamRequestByteBuf.slice();
        try {
            readImpl(dataStreamRequestByteBuf, channelHandlerContext, slice, checkedBiFunction);
        } catch (Throwable th) {
            replyDataStreamException(th, dataStreamRequestByteBuf, channelHandlerContext);
            slice.release();
        }
    }

    private void readImpl(DataStreamRequestByteBuf dataStreamRequestByteBuf, ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> checkedBiFunction) {
        StreamInfo streamInfo;
        CompletableFuture<Long> write;
        List applyToRemotes;
        boolean contains = dataStreamRequestByteBuf.getWriteOptionList().contains(StandardWriteOption.CLOSE);
        ClientInvocationId valueOf = ClientInvocationId.valueOf(dataStreamRequestByteBuf.getClientId(), dataStreamRequestByteBuf.getStreamId());
        if (dataStreamRequestByteBuf.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER) {
            MemoizedSupplier memoize = JavaUtils.memoize(() -> {
                return newStreamInfo(byteBuf, checkedBiFunction);
            });
            streamInfo = this.streams.computeIfAbsent(valueOf, clientInvocationId -> {
                return (StreamInfo) memoize.get();
            });
            if (!memoize.isInitialized()) {
                this.streams.remove(valueOf);
                throw new IllegalStateException("Failed to create a new stream for " + dataStreamRequestByteBuf + " since a stream already exists Key: " + valueOf + " StreamInfo:" + streamInfo);
            }
            getMetrics().onRequestCreate(NettyServerStreamRpcMetrics.RequestType.HEADER);
        } else {
            streamInfo = contains ? (StreamInfo) Optional.ofNullable(this.streams.remove(valueOf)).orElseThrow(() -> {
                return new IllegalStateException("Failed to remove StreamInfo for " + dataStreamRequestByteBuf);
            }) : (StreamInfo) Optional.ofNullable(this.streams.get(valueOf)).orElseThrow(() -> {
                this.streams.remove(valueOf);
                return new IllegalStateException("Failed to get StreamInfo for " + dataStreamRequestByteBuf);
            });
        }
        if (dataStreamRequestByteBuf.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER) {
            write = CompletableFuture.completedFuture(0L);
            applyToRemotes = Collections.emptyList();
        } else {
            if (dataStreamRequestByteBuf.getType() != RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA) {
                throw new IllegalStateException(this + ": Unexpected type " + dataStreamRequestByteBuf.getType() + ", request=" + dataStreamRequestByteBuf);
            }
            write = streamInfo.getLocal().write(byteBuf, dataStreamRequestByteBuf.getWriteOptionList(), this.writeExecutor);
            applyToRemotes = streamInfo.applyToRemotes(remoteStream -> {
                return remoteStream.write(dataStreamRequestByteBuf, this.requestExecutor);
            });
        }
        List list = applyToRemotes;
        CompletableFuture<Long> completableFuture = write;
        StreamInfo streamInfo2 = streamInfo;
        CompletableFuture composeAsync = composeAsync(streamInfo.getPrevious(), this.requestExecutor, r16 -> {
            return JavaUtils.allOf(list).thenCombineAsync((CompletionStage) completableFuture, (r13, l) -> {
                if (dataStreamRequestByteBuf.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER || (dataStreamRequestByteBuf.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA && !contains)) {
                    sendReply(list, dataStreamRequestByteBuf, l.longValue(), streamInfo2.getCommitInfos(), channelHandlerContext);
                    return null;
                }
                if (!contains) {
                    throw new IllegalStateException(this + ": Unexpected type " + dataStreamRequestByteBuf.getType() + ", request=" + dataStreamRequestByteBuf);
                }
                if (streamInfo2.isPrimary()) {
                    startTransaction(streamInfo2, dataStreamRequestByteBuf, l.longValue(), channelHandlerContext);
                    return null;
                }
                sendReply(list, dataStreamRequestByteBuf, l.longValue(), streamInfo2.getCommitInfos(), channelHandlerContext);
                return null;
            }, this.requestExecutor);
        });
        StreamInfo streamInfo3 = streamInfo;
        composeAsync.whenComplete((r12, th) -> {
            if (th != null) {
                try {
                    this.streams.remove(valueOf);
                    replyDataStreamException(this.server, th, streamInfo3.getRequest(), dataStreamRequestByteBuf, channelHandlerContext);
                } finally {
                    byteBuf.release();
                }
            }
        });
    }

    static void assertReplyCorrespondingToRequest(DataStreamRequestByteBuf dataStreamRequestByteBuf, DataStreamReply dataStreamReply) {
        Preconditions.assertTrue(dataStreamRequestByteBuf.getClientId().equals(dataStreamReply.getClientId()));
        Preconditions.assertTrue(dataStreamRequestByteBuf.getType() == dataStreamReply.getType());
        Preconditions.assertTrue(dataStreamRequestByteBuf.getStreamId() == dataStreamReply.getStreamId());
        Preconditions.assertTrue(dataStreamRequestByteBuf.getStreamOffset() == dataStreamReply.getStreamOffset());
    }

    static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> list, long j, DataStreamRequestByteBuf dataStreamRequestByteBuf) {
        Iterator<CompletableFuture<DataStreamReply>> it = list.iterator();
        while (it.hasNext()) {
            DataStreamReply join = it.next().join();
            assertReplyCorrespondingToRequest(dataStreamRequestByteBuf, join);
            if (!join.isSuccess()) {
                LOG.warn("reply is not success, request: {}", dataStreamRequestByteBuf);
                return false;
            }
            if (join.getBytesWritten() != j) {
                LOG.warn("reply written bytes not match, local size: {} remote size: {} request: {}", Long.valueOf(j), Long.valueOf(join.getBytesWritten()), dataStreamRequestByteBuf);
                return false;
            }
        }
        return true;
    }

    NettyServerStreamRpcMetrics getMetrics() {
        return this.nettyServerStreamRpcMetrics;
    }

    public String toString() {
        return this.name;
    }
}
