package org.apache.ratis.netty.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.netty.client.NettyClientReplies;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.security.TlsConf;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc.class
 */
/* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc.class */
public class NettyClientStreamRpc implements DataStreamClientRpc {
    private final String name;
    private final Connection connection;
    private final TimeDuration requestTimeout;
    private final TimeDuration closeTimeout;
    private final int flushRequestCountMin;
    private final SizeInBytes flushRequestBytesMin;
    public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
    static final MessageToMessageEncoder<DataStreamRequestByteBuffer> ENCODER = new Encoder();
    static final MessageToMessageEncoder<DataStreamRequestByteBuf> ENCODER_BYTE_BUF = new EncoderByteBuf();
    static final MessageToMessageEncoder<DataStreamRequestFilePositionCount> ENCODER_FILE_POSITION_COUNT = new EncoderFilePositionCount();
    static final MessageToMessageEncoder<ByteBuffer> ENCODER_BYTE_BUFFER = new EncoderByteBuffer();
    private final NettyClientReplies replies = new NettyClientReplies();
    private final OutstandingRequests outstandingRequests = new OutstandingRequests();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc$Connection.class
     */
    /* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc$Connection.class */
    public static class Connection {
        static final TimeDuration RECONNECT = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
        private final InetSocketAddress address;
        private final WorkerGroupGetter workerGroup;
        private final Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier;
        private final AtomicReference<MemoizedSupplier<ChannelFuture>> ref = new AtomicReference<>(MemoizedSupplier.valueOf(this::connect));

        Connection(InetSocketAddress inetSocketAddress, WorkerGroupGetter workerGroupGetter, Supplier<ChannelInitializer<SocketChannel>> supplier) {
            this.address = inetSocketAddress;
            this.workerGroup = workerGroupGetter;
            this.channelInitializerSupplier = supplier;
        }

        ChannelFuture getChannelFuture() {
            Supplier supplier = this.ref.get();
            if (supplier != null) {
                return (ChannelFuture) supplier.get();
            }
            return null;
        }

        Channel getChannelUninterruptibly() {
            ChannelFuture channelFuture = getChannelFuture();
            if (channelFuture == null) {
                return null;
            }
            Channel channel = channelFuture.syncUninterruptibly().channel();
            if (channel.isActive()) {
                return channel;
            }
            ChannelFuture reconnect = reconnect();
            if (reconnect == null) {
                return null;
            }
            return reconnect.syncUninterruptibly().channel();
        }

        private EventLoopGroup getWorkerGroup() {
            return this.workerGroup.get();
        }

        private ChannelFuture connect() {
            if (isClosed()) {
                return null;
            }
            return new Bootstrap().group(getWorkerGroup()).channel(NettyUtils.getSocketChannelClass(getWorkerGroup())).handler(this.channelInitializerSupplier.get()).option(ChannelOption.SO_KEEPALIVE, true).connect(this.address).addListener(new ChannelFutureListener() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.Connection.1
                public void operationComplete(ChannelFuture channelFuture) {
                    if (channelFuture.isSuccess()) {
                        NettyClientStreamRpc.LOG.trace("{} succeed.", Connection.this);
                    } else {
                        Connection.this.scheduleReconnect(Connection.this + " failed", channelFuture.cause());
                    }
                }
            });
        }

        void scheduleReconnect(String str, Throwable th) {
            if (isClosed()) {
                return;
            }
            NettyClientStreamRpc.LOG.warn("{}: {}; schedule reconnecting to {} in {}", new Object[]{this, str, this.address, RECONNECT});
            if (th != null) {
                NettyClientStreamRpc.LOG.warn("", th);
            }
            getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), RECONNECT.getUnit());
        }

        private synchronized ChannelFuture reconnect() {
            ChannelFuture channelFuture = getChannelFuture();
            if (channelFuture != null && channelFuture.syncUninterruptibly().channel().isActive()) {
                return channelFuture;
            }
            MemoizedSupplier valueOf = MemoizedSupplier.valueOf(() -> {
                return MemoizedSupplier.valueOf(this::connect);
            });
            MemoizedSupplier<ChannelFuture> andUpdate = this.ref.getAndUpdate(memoizedSupplier -> {
                if (memoizedSupplier == null) {
                    return null;
                }
                return (MemoizedSupplier) valueOf.get();
            });
            if (andUpdate != null && andUpdate.isInitialized()) {
                ((ChannelFuture) andUpdate.get()).channel().close();
            }
            return getChannelFuture();
        }

        void close() {
            MemoizedSupplier<ChannelFuture> andSet = this.ref.getAndSet(null);
            if (andSet == null || !andSet.isInitialized()) {
                this.workerGroup.shutdownGracefully();
            } else {
                ((ChannelFuture) andSet.get()).channel().close().addListener(future -> {
                    this.workerGroup.shutdownGracefully();
                });
            }
        }

        boolean isClosed() {
            return this.ref.get() == null;
        }

        public String toString() {
            return JavaUtils.getClassSimpleName(getClass()) + "-" + this.address;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc$Encoder.class
     */
    @ChannelHandler.Sharable
    /* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc$Encoder.class */
    static class Encoder extends MessageToMessageEncoder<DataStreamRequestByteBuffer> {
        Encoder() {
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, DataStreamRequestByteBuffer dataStreamRequestByteBuffer, List<Object> list) {
            list.getClass();
            NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(dataStreamRequestByteBuffer, list::add, channelHandlerContext.alloc());
        }

        protected /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            encode(channelHandlerContext, (DataStreamRequestByteBuffer) obj, (List<Object>) list);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc$EncoderByteBuf.class
     */
    @ChannelHandler.Sharable
    /* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc$EncoderByteBuf.class */
    static class EncoderByteBuf extends MessageToMessageEncoder<DataStreamRequestByteBuf> {
        EncoderByteBuf() {
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, DataStreamRequestByteBuf dataStreamRequestByteBuf, List<Object> list) {
            list.getClass();
            NettyDataStreamUtils.encodeDataStreamRequestByteBuf(dataStreamRequestByteBuf, list::add, channelHandlerContext.alloc());
        }

        protected /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            encode(channelHandlerContext, (DataStreamRequestByteBuf) obj, (List<Object>) list);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc$EncoderByteBuffer.class
     */
    @ChannelHandler.Sharable
    /* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc$EncoderByteBuffer.class */
    static class EncoderByteBuffer extends MessageToMessageEncoder<ByteBuffer> {
        EncoderByteBuffer() {
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuffer byteBuffer, List<Object> list) {
            list.getClass();
            NettyDataStreamUtils.encodeByteBuffer(byteBuffer, list::add);
        }

        protected /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            encode(channelHandlerContext, (ByteBuffer) obj, (List<Object>) list);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc$EncoderFilePositionCount.class
     */
    @ChannelHandler.Sharable
    /* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc$EncoderFilePositionCount.class */
    static class EncoderFilePositionCount extends MessageToMessageEncoder<DataStreamRequestFilePositionCount> {
        EncoderFilePositionCount() {
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, DataStreamRequestFilePositionCount dataStreamRequestFilePositionCount, List<Object> list) {
            list.getClass();
            NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(dataStreamRequestFilePositionCount, list::add, channelHandlerContext.alloc());
        }

        protected /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            encode(channelHandlerContext, (DataStreamRequestFilePositionCount) obj, (List<Object>) list);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc$OutstandingRequests.class
     */
    /* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc$OutstandingRequests.class */
    static class OutstandingRequests {
        private int count;
        private long bytes;

        OutstandingRequests() {
        }

        private boolean shouldFlush(List<WriteOption> list, int i, SizeInBytes sizeInBytes) {
            if (list.contains(StandardWriteOption.CLOSE)) {
                return true;
            }
            if (this.bytes == 0 && this.count == 0) {
                return false;
            }
            return this.count >= i || this.bytes >= sizeInBytes.getSize() || list.contains(StandardWriteOption.FLUSH);
        }

        synchronized boolean shouldFlush(int i, SizeInBytes sizeInBytes, DataStreamRequest dataStreamRequest) {
            List<WriteOption> writeOptionList;
            if (dataStreamRequest == null) {
                writeOptionList = Collections.emptyList();
            } else {
                writeOptionList = dataStreamRequest.getWriteOptionList();
                this.count++;
                long dataLength = dataStreamRequest.getDataLength();
                Preconditions.assertTrue(dataLength >= 0, () -> {
                    return "length = " + dataLength + " < 0, request: " + dataStreamRequest;
                });
                this.bytes += dataLength;
            }
            boolean shouldFlush = shouldFlush(writeOptionList, i, sizeInBytes);
            NettyClientStreamRpc.LOG.debug("flush? {}, (count, bytes)=({}, {}), min=({}, {}), request={}, options={}", new Object[]{Boolean.valueOf(shouldFlush), Integer.valueOf(this.count), Long.valueOf(this.bytes), Integer.valueOf(i), sizeInBytes, dataStreamRequest, writeOptionList});
            if (shouldFlush) {
                this.count = 0;
                this.bytes = 0L;
            }
            return shouldFlush;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/ratis/netty/client/NettyClientStreamRpc$WorkerGroupGetter.class
     */
    /* loaded from: input_file:ratis-netty-3.1.0.jar:org/apache/ratis/netty/client/NettyClientStreamRpc$WorkerGroupGetter.class */
    public static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
        private static final AtomicReference<CompletableFuture<ReferenceCountedObject<EventLoopGroup>>> SHARED_WORKER_GROUP = new AtomicReference<>();
        private final EventLoopGroup workerGroup;

        static WorkerGroupGetter newInstance(RaftProperties raftProperties) {
            if (!NettyConfigKeys.DataStream.Client.workerGroupShare(raftProperties)) {
                return new WorkerGroupGetter(newWorkerGroup(raftProperties));
            }
            CompletableFuture<ReferenceCountedObject<EventLoopGroup>> completableFuture = new CompletableFuture<>();
            final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> updateAndGet = SHARED_WORKER_GROUP.updateAndGet(completableFuture2 -> {
                return completableFuture2 != null ? completableFuture2 : completableFuture;
            });
            if (updateAndGet == completableFuture) {
                completableFuture.complete(ReferenceCountedObject.wrap(newWorkerGroup(raftProperties)));
            }
            return new WorkerGroupGetter((EventLoopGroup) updateAndGet.join().retain()) { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.WorkerGroupGetter.1
                @Override // org.apache.ratis.netty.client.NettyClientStreamRpc.WorkerGroupGetter
                void shutdownGracefully() {
                    AtomicReference atomicReference = WorkerGroupGetter.SHARED_WORKER_GROUP;
                    CompletableFuture completableFuture3 = updateAndGet;
                    if (((CompletableFuture) atomicReference.updateAndGet(completableFuture4 -> {
                        Preconditions.assertSame(completableFuture3, completableFuture4, "SHARED_WORKER_GROUP");
                        if (((ReferenceCountedObject) completableFuture4.join()).release()) {
                            return null;
                        }
                        return completableFuture4;
                    })) == null) {
                        get().shutdownGracefully();
                    }
                }

                @Override // org.apache.ratis.netty.client.NettyClientStreamRpc.WorkerGroupGetter, java.util.function.Supplier
                public /* bridge */ /* synthetic */ EventLoopGroup get() {
                    return super.get();
                }
            };
        }

        static EventLoopGroup newWorkerGroup(RaftProperties raftProperties) {
            return NettyUtils.newEventLoopGroup(JavaUtils.getClassSimpleName(NettyClientStreamRpc.class) + "-workerGroup", NettyConfigKeys.DataStream.Client.workerGroupSize(raftProperties), NettyConfigKeys.DataStream.Client.useEpoll(raftProperties));
        }

        private WorkerGroupGetter(EventLoopGroup eventLoopGroup) {
            this.workerGroup = eventLoopGroup;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public final EventLoopGroup get() {
            return this.workerGroup;
        }

        void shutdownGracefully() {
            this.workerGroup.shutdownGracefully();
        }
    }

    public NettyClientStreamRpc(RaftPeer raftPeer, TlsConf tlsConf, RaftProperties raftProperties) {
        this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + raftPeer.getId();
        this.requestTimeout = RaftClientConfigKeys.DataStream.requestTimeout(raftProperties);
        this.closeTimeout = this.requestTimeout.multiply(2.0d);
        this.flushRequestCountMin = RaftClientConfigKeys.DataStream.flushRequestCountMin(raftProperties);
        this.flushRequestBytesMin = RaftClientConfigKeys.DataStream.flushRequestBytesMin(raftProperties);
        InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(raftPeer.getDataStreamAddress());
        SslContext buildSslContextForClient = NettyUtils.buildSslContextForClient(tlsConf);
        this.connection = new Connection(createSocketAddr, WorkerGroupGetter.newInstance(raftProperties), () -> {
            return newChannelInitializer(createSocketAddr, buildSslContextForClient, getClientHandler());
        });
    }

    private ChannelInboundHandler getClientHandler() {
        return new ChannelInboundHandlerAdapter() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.1
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                if (!(obj instanceof DataStreamReply)) {
                    NettyClientStreamRpc.LOG.error("{}: unexpected message {}", NettyClientStreamRpc.this.name, obj.getClass());
                    return;
                }
                DataStreamReply dataStreamReply = (DataStreamReply) obj;
                NettyClientStreamRpc.LOG.debug("{}: read {}", NettyClientStreamRpc.this.name, dataStreamReply);
                ClientInvocationId valueOf = ClientInvocationId.valueOf(dataStreamReply.getClientId(), dataStreamReply.getStreamId());
                NettyClientReplies.ReplyMap replyMap = NettyClientStreamRpc.this.replies.getReplyMap(valueOf);
                if (replyMap == null) {
                    NettyClientStreamRpc.LOG.error("{}: {} replyMap not found for reply: {}", new Object[]{NettyClientStreamRpc.this.name, valueOf, dataStreamReply});
                    return;
                }
                try {
                    replyMap.receiveReply(dataStreamReply);
                } catch (Throwable th) {
                    NettyClientStreamRpc.LOG.warn(NettyClientStreamRpc.this.name + ": channelRead error:", th);
                    replyMap.completeExceptionally(th);
                }
            }

            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
                NettyClientStreamRpc.LOG.warn(NettyClientStreamRpc.this.name + ": exceptionCaught", th);
                channelHandlerContext.close();
            }

            public void channelInactive(ChannelHandlerContext channelHandlerContext) {
                NettyClientStreamRpc.this.connection.scheduleReconnect("channel is inactive", null);
            }
        };
    }

    static ChannelInitializer<SocketChannel> newChannelInitializer(final InetSocketAddress inetSocketAddress, final SslContext sslContext, final ChannelInboundHandler channelInboundHandler) {
        return new ChannelInitializer<SocketChannel>() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.2
            public void initChannel(SocketChannel socketChannel) {
                ChannelPipeline pipeline = socketChannel.pipeline();
                if (sslContext != null) {
                    pipeline.addLast("ssl", sslContext.newHandler(socketChannel.alloc(), inetSocketAddress.getHostName(), inetSocketAddress.getPort()));
                }
                pipeline.addLast(new ChannelHandler[]{NettyClientStreamRpc.ENCODER});
                pipeline.addLast(new ChannelHandler[]{NettyClientStreamRpc.ENCODER_FILE_POSITION_COUNT});
                pipeline.addLast(new ChannelHandler[]{NettyClientStreamRpc.ENCODER_BYTE_BUFFER});
                pipeline.addLast(new ChannelHandler[]{NettyClientStreamRpc.ENCODER_BYTE_BUF});
                pipeline.addLast(new ChannelHandler[]{NettyClientStreamRpc.newDecoder()});
                pipeline.addLast(new ChannelHandler[]{channelInboundHandler});
            }
        };
    }

    static ByteToMessageDecoder newDecoder() {
        return new ByteToMessageDecoder() { // from class: org.apache.ratis.netty.client.NettyClientStreamRpc.3
            {
                setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            }

            protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
                Optional ofNullable = Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamReplyByteBuffer(byteBuf));
                list.getClass();
                ofNullable.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
        };
    }

    public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest dataStreamRequest) {
        Function function;
        CompletableFuture<DataStreamReply> completableFuture = new CompletableFuture<>();
        ClientInvocationId valueOf = ClientInvocationId.valueOf(dataStreamRequest.getClientId(), dataStreamRequest.getStreamId());
        boolean contains = dataStreamRequest.getWriteOptionList().contains(StandardWriteOption.CLOSE);
        NettyClientReplies.ReplyMap replyMap = this.replies.getReplyMap(valueOf);
        NettyClientReplies.RequestEntry requestEntry = new NettyClientReplies.RequestEntry(dataStreamRequest);
        LOG.debug("{}: write begin {}", this, dataStreamRequest);
        synchronized (replyMap) {
            Channel channelUninterruptibly = this.connection.getChannelUninterruptibly();
            if (channelUninterruptibly == null) {
                completableFuture.completeExceptionally(new AlreadyClosedException(this + ": Failed to send " + dataStreamRequest));
                return completableFuture;
            }
            NettyClientReplies.ReplyEntry submitRequest = replyMap.submitRequest(requestEntry, contains, completableFuture);
            if (this.outstandingRequests.shouldFlush(this.flushRequestCountMin, this.flushRequestBytesMin, dataStreamRequest)) {
                channelUninterruptibly.getClass();
                function = (v1) -> {
                    return r0.writeAndFlush(v1);
                };
            } else {
                channelUninterruptibly.getClass();
                function = (v1) -> {
                    return r0.write(v1);
                };
            }
            ((ChannelFuture) function.apply(dataStreamRequest)).addListener(future -> {
                if (future.isSuccess()) {
                    LOG.debug("{}: write after {}", this, dataStreamRequest);
                    TimeDuration timeDuration = contains ? this.closeTimeout : this.requestTimeout;
                    submitRequest.scheduleTimeout(() -> {
                        return channelUninterruptibly.eventLoop().schedule(() -> {
                            if (completableFuture.isDone()) {
                                return;
                            }
                            completableFuture.completeExceptionally(new TimeoutIOException("Timeout " + timeDuration + ": Failed to send " + dataStreamRequest + " via channel " + channelUninterruptibly));
                            replyMap.fail(requestEntry);
                        }, timeDuration.getDuration(), timeDuration.getUnit());
                    });
                } else {
                    IOException iOException = new IOException(this + ": Failed to send " + dataStreamRequest + " to " + channelUninterruptibly.remoteAddress(), future.cause());
                    completableFuture.completeExceptionally(iOException);
                    replyMap.fail(requestEntry);
                    LOG.error("Channel write failed", iOException);
                }
            });
            return completableFuture;
        }
    }

    public void close() {
        if (this.outstandingRequests.shouldFlush(0, SizeInBytes.ZERO, (DataStreamRequest) null)) {
            Optional.ofNullable(this.connection.getChannelUninterruptibly()).map(channel -> {
                return channel.writeAndFlush(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
            }).ifPresent(channelFuture -> {
                channelFuture.addListener(future -> {
                    this.connection.close();
                });
            });
        } else {
            this.connection.close();
        }
    }

    public String toString() {
        return this.name;
    }
}
