package io.atomix.cluster.messaging.impl;

import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.ProtocolReply;
import io.atomix.utils.concurrent.OrderedFuture;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.util.StringUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.compression.SnappyFrameDecoder;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import java.net.ConnectException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingService.class */
public final class NettyMessagingService implements ManagedMessagingService {
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
    private static final String TLS_PROTOCOL = "TLSv1.3";
    private final Logger log;
    private final Address advertisedAddress;
    private final Collection<Address> bindingAddresses;
    private final int preamble;
    private final ProtocolVersion protocolVersion;
    private final AtomicBoolean started;
    private final HandlerRegistry handlers;
    private final Map<Channel, RemoteClientConnection> connections;
    private final AtomicLong messageIdGenerator;
    private final ChannelPool channelPool;
    private final List<CompletableFuture> openFutures;
    private final MessagingConfig config;
    private EventLoopGroup serverGroup;
    private EventLoopGroup clientGroup;
    private Class<? extends ServerChannel> serverChannelClass;
    private Class<? extends Channel> clientChannelClass;
    private Channel serverChannel;
    private ScheduledExecutorService timeoutExecutor;
    private volatile LocalClientConnection localConnection;
    private SslContext serverSslContext;
    private SslContext clientSslContext;
    private final MessagingMetrics messagingMetrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingService$BasicClientChannelInitializer.class */
    public class BasicClientChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final CompletableFuture<Channel> future;

        BasicClientChannelInitializer(CompletableFuture<Channel> completableFuture) {
            this.future = completableFuture;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) {
            if (NettyMessagingService.this.config.isTlsEnabled()) {
                socketChannel.pipeline().addLast("tls", NettyMessagingService.this.clientSslContext.newHandler(socketChannel.alloc()));
            }
            socketChannel.pipeline().addLast("handshake", new ClientHandshakeHandlerAdapter(this.future));
            switch (NettyMessagingService.this.config.getCompressionAlgorithm()) {
                case GZIP:
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)});
                    return;
                case SNAPPY:
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameEncoder()});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameDecoder()});
                    return;
                case NONE:
                    return;
                default:
                    NettyMessagingService.this.log.debug("Unknown compression algorithm. Proceeding without compression.");
                    return;
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            this.future.completeExceptionally(th);
            super.exceptionCaught(channelHandlerContext, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingService$BasicServerChannelInitializer.class */
    public class BasicServerChannelInitializer extends ChannelInitializer<SocketChannel> {
        private BasicServerChannelInitializer() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) {
            if (NettyMessagingService.this.config.isTlsEnabled()) {
                socketChannel.pipeline().addLast("tls", NettyMessagingService.this.serverSslContext.newHandler(socketChannel.alloc()));
            }
            socketChannel.pipeline().addLast("handshake", new ServerHandshakeHandlerAdapter());
            switch (NettyMessagingService.this.config.getCompressionAlgorithm()) {
                case GZIP:
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)});
                    return;
                case SNAPPY:
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameEncoder()});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SnappyFrameDecoder()});
                    return;
                case NONE:
                    return;
                default:
                    NettyMessagingService.this.log.debug("Unknown compression algorithm. Proceeding without compression.");
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingService$ClientHandshakeHandlerAdapter.class */
    public class ClientHandshakeHandlerAdapter extends HandshakeHandlerAdapter<ProtocolReply> {
        private final CompletableFuture<Channel> future;

        ClientHandshakeHandlerAdapter(CompletableFuture<Channel> completableFuture) {
            super();
            this.future = completableFuture;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            NettyMessagingService.this.log.debug("Writing client protocol version {} for connection to {}", NettyMessagingService.this.protocolVersion, channelHandlerContext.channel().remoteAddress());
            writeProtocolVersion(channelHandlerContext, NettyMessagingService.this.protocolVersion);
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            readProtocolVersion(channelHandlerContext, (ByteBuf) obj).ifPresent(i -> {
                ProtocolVersion valueOf = ProtocolVersion.valueOf(i);
                if (valueOf != null) {
                    activateProtocolVersion(channelHandlerContext, NettyMessagingService.this.getOrCreateClientConnection(channelHandlerContext.channel()), valueOf);
                } else {
                    NettyMessagingService.this.log.error("Failed to negotiate protocol version");
                    channelHandlerContext.close();
                }
            });
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            this.future.completeExceptionally(th);
        }

        @Override // io.atomix.cluster.messaging.impl.NettyMessagingService.HandshakeHandlerAdapter
        void activateProtocolVersion(ChannelHandlerContext channelHandlerContext, Connection<ProtocolReply> connection, ProtocolVersion protocolVersion) {
            NettyMessagingService.this.log.debug("Activating client protocol version {} for connection to {}", protocolVersion, channelHandlerContext.channel().remoteAddress());
            super.activateProtocolVersion(channelHandlerContext, connection, protocolVersion);
            this.future.complete(channelHandlerContext.channel());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingService$HandshakeHandlerAdapter.class */
    public abstract class HandshakeHandlerAdapter<M extends ProtocolMessage> extends ChannelInboundHandlerAdapter {
        private HandshakeHandlerAdapter() {
        }

        void writeProtocolVersion(ChannelHandlerContext channelHandlerContext, ProtocolVersion protocolVersion) {
            ByteBuf buffer = channelHandlerContext.alloc().buffer(6);
            buffer.writeInt(NettyMessagingService.this.preamble);
            buffer.writeShort(protocolVersion.version());
            channelHandlerContext.writeAndFlush(buffer);
        }

        OptionalInt readProtocolVersion(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
            try {
                if (byteBuf.readInt() == NettyMessagingService.this.preamble) {
                    OptionalInt of = OptionalInt.of(byteBuf.readShort());
                    byteBuf.release();
                    return of;
                }
                NettyMessagingService.this.log.warn("Received invalid handshake, closing connection");
                channelHandlerContext.close();
                OptionalInt empty = OptionalInt.empty();
                byteBuf.release();
                return empty;
            } catch (Throwable th) {
                byteBuf.release();
                throw th;
            }
        }

        void activateProtocolVersion(ChannelHandlerContext channelHandlerContext, Connection<M> connection, ProtocolVersion protocolVersion) {
            MessagingProtocol createProtocol = protocolVersion.createProtocol(NettyMessagingService.this.advertisedAddress);
            channelHandlerContext.pipeline().remove(this);
            channelHandlerContext.pipeline().addLast("encoder", createProtocol.newEncoder());
            channelHandlerContext.pipeline().addLast("decoder", createProtocol.newDecoder());
            channelHandlerContext.pipeline().addLast("handler", new MessageDispatcher(connection));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingService$MessageDispatcher.class */
    public class MessageDispatcher<M extends ProtocolMessage> extends SimpleChannelInboundHandler<Object> {
        private final Connection<M> connection;

        MessageDispatcher(Connection<M> connection) {
            this.connection = connection;
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.connection.close();
            channelHandlerContext.close();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyMessagingService.this.log.error("Exception inside channel handling pipeline", th);
            this.connection.close();
            channelHandlerContext.close();
        }

        public boolean acceptInboundMessage(Object obj) {
            return obj instanceof ProtocolMessage;
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            try {
                this.connection.dispatch((ProtocolMessage) obj);
            } catch (RejectedExecutionException e) {
                NettyMessagingService.this.log.warn("Unable to dispatch message due to {}", e.getMessage());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingService$ServerHandshakeHandlerAdapter.class */
    public class ServerHandshakeHandlerAdapter extends HandshakeHandlerAdapter<ProtocolRequest> {
        private ServerHandshakeHandlerAdapter() {
            super();
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            readProtocolVersion(channelHandlerContext, (ByteBuf) obj).ifPresent(i -> {
                ProtocolVersion valueOf = ProtocolVersion.valueOf(i);
                if (valueOf == null) {
                    valueOf = ProtocolVersion.latest();
                }
                writeProtocolVersion(channelHandlerContext, valueOf);
                activateProtocolVersion(channelHandlerContext, new RemoteServerConnection(NettyMessagingService.this.handlers, channelHandlerContext.channel()), valueOf);
            });
        }

        @Override // io.atomix.cluster.messaging.impl.NettyMessagingService.HandshakeHandlerAdapter
        void activateProtocolVersion(ChannelHandlerContext channelHandlerContext, Connection<ProtocolRequest> connection, ProtocolVersion protocolVersion) {
            NettyMessagingService.this.log.debug("Activating server protocol version {} for connection to {}", protocolVersion, channelHandlerContext.channel().remoteAddress());
            super.activateProtocolVersion(channelHandlerContext, connection, protocolVersion);
        }
    }

    public NettyMessagingService(String str, Address address, MessagingConfig messagingConfig) {
        this(str, address, messagingConfig, ProtocolVersion.latest());
    }

    NettyMessagingService(String str, Address address, MessagingConfig messagingConfig, ProtocolVersion protocolVersion) {
        this.log = LoggerFactory.getLogger(getClass());
        this.bindingAddresses = new ArrayList();
        this.started = new AtomicBoolean(false);
        this.handlers = new HandlerRegistry();
        this.connections = Maps.newConcurrentMap();
        this.messageIdGenerator = new AtomicLong(0L);
        this.messagingMetrics = new MessagingMetricsImpl();
        this.preamble = str.hashCode();
        this.advertisedAddress = address;
        this.protocolVersion = protocolVersion;
        this.config = messagingConfig;
        this.channelPool = new ChannelPool(this::openChannel, messagingConfig.getConnectionPoolSize());
        this.openFutures = new CopyOnWriteArrayList();
        initAddresses(messagingConfig);
    }

    NettyMessagingService(String str, Address address, MessagingConfig messagingConfig, ProtocolVersion protocolVersion, Function<Function<Address, CompletableFuture<Channel>>, ChannelPool> function) {
        this.log = LoggerFactory.getLogger(getClass());
        this.bindingAddresses = new ArrayList();
        this.started = new AtomicBoolean(false);
        this.handlers = new HandlerRegistry();
        this.connections = Maps.newConcurrentMap();
        this.messageIdGenerator = new AtomicLong(0L);
        this.messagingMetrics = new MessagingMetricsImpl();
        this.preamble = str.hashCode();
        this.advertisedAddress = address;
        this.protocolVersion = protocolVersion;
        this.config = messagingConfig;
        this.channelPool = function.apply(this::openChannel);
        this.openFutures = new CopyOnWriteArrayList();
        initAddresses(messagingConfig);
    }

    private void initAddresses(MessagingConfig messagingConfig) {
        int intValue = messagingConfig.getPort() != null ? messagingConfig.getPort().intValue() : this.advertisedAddress.port();
        if (messagingConfig.getInterfaces().isEmpty()) {
            this.bindingAddresses.add(Address.from(this.advertisedAddress.host(), intValue));
        } else {
            this.bindingAddresses.addAll((List) messagingConfig.getInterfaces().stream().map(str -> {
                return Address.from(str, intValue);
            }).collect(Collectors.toList()));
        }
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public Address address() {
        return this.advertisedAddress;
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public Collection<Address> bindingAddresses() {
        return this.bindingAddresses;
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public CompletableFuture<Void> sendAsync(Address address, String str, byte[] bArr, boolean z) {
        ProtocolRequest protocolRequest = new ProtocolRequest(this.messageIdGenerator.incrementAndGet(), this.advertisedAddress, str, bArr);
        return executeOnPooledConnection(address, str, clientConnection -> {
            return clientConnection.sendAsync(protocolRequest);
        }, MoreExecutors.directExecutor());
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public CompletableFuture<byte[]> sendAndReceive(Address address, String str, byte[] bArr, boolean z) {
        return sendAndReceive(address, str, bArr, z, DEFAULT_TIMEOUT, MoreExecutors.directExecutor());
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public CompletableFuture<byte[]> sendAndReceive(Address address, String str, byte[] bArr, boolean z, Executor executor) {
        return sendAndReceive(address, str, bArr, z, DEFAULT_TIMEOUT, executor);
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public CompletableFuture<byte[]> sendAndReceive(Address address, String str, byte[] bArr, boolean z, Duration duration) {
        return sendAndReceive(address, str, bArr, z, duration, MoreExecutors.directExecutor());
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public CompletableFuture<byte[]> sendAndReceive(Address address, String str, byte[] bArr, boolean z, Duration duration, Executor executor) {
        if (!this.started.get()) {
            return CompletableFuture.failedFuture(new IllegalStateException("MessagingService is closed."));
        }
        ProtocolRequest protocolRequest = new ProtocolRequest(this.messageIdGenerator.incrementAndGet(), this.advertisedAddress, str, bArr);
        CompletableFuture<byte[]> executeOnPooledConnection = z ? executeOnPooledConnection(address, str, clientConnection -> {
            return clientConnection.sendAndReceive(protocolRequest);
        }, executor) : executeOnTransientConnection(address, clientConnection2 -> {
            return clientConnection2.sendAndReceive(protocolRequest);
        }, executor);
        CompletableFuture<byte[]> completableFuture = executeOnPooledConnection;
        ScheduledFuture<?> schedule = this.timeoutExecutor.schedule(() -> {
            completableFuture.completeExceptionally(new TimeoutException(String.format("Request %s to %s timed out in %s", str, address, duration)));
            this.openFutures.remove(completableFuture);
        }, duration.toNanos(), TimeUnit.NANOSECONDS);
        executeOnPooledConnection.whenComplete((bArr2, th) -> {
            schedule.cancel(true);
        });
        return executeOnPooledConnection;
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public void registerHandler(String str, BiConsumer<Address, byte[]> biConsumer, Executor executor) {
        this.handlers.register(str, (protocolRequest, serverConnection) -> {
            executor.execute(() -> {
                biConsumer.accept(protocolRequest.sender(), protocolRequest.payload());
            });
        });
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public void registerHandler(String str, BiFunction<Address, byte[], byte[]> biFunction, Executor executor) {
        this.handlers.register(str, (protocolRequest, serverConnection) -> {
            executor.execute(() -> {
                byte[] bArr = null;
                ProtocolReply.Status status = ProtocolReply.Status.OK;
                try {
                    bArr = (byte[]) biFunction.apply(protocolRequest.sender(), protocolRequest.payload());
                } catch (Exception e) {
                    this.log.warn("Unexpected error while handling message {} from {}", new Object[]{protocolRequest.subject(), protocolRequest.sender(), e});
                    status = ProtocolReply.Status.ERROR_HANDLER_EXCEPTION;
                    String message = e.getMessage();
                    if (message != null) {
                        bArr = StringUtil.getBytes(message);
                    }
                }
                serverConnection.reply(protocolRequest.id(), status, Optional.ofNullable(bArr));
            });
        });
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public void registerHandler(String str, BiFunction<Address, byte[], CompletableFuture<byte[]>> biFunction) {
        this.handlers.register(str, (protocolRequest, serverConnection) -> {
            long id = protocolRequest.id();
            String subject = protocolRequest.subject();
            Address sender = protocolRequest.sender();
            ((CompletableFuture) biFunction.apply(sender, protocolRequest.payload())).whenComplete((bArr, th) -> {
                ProtocolReply.Status status;
                byte[] bArr = null;
                if (th == null) {
                    status = ProtocolReply.Status.OK;
                    bArr = bArr;
                } else {
                    this.log.warn("Unexpected error while handling message {} from {}", new Object[]{subject, sender, th});
                    status = ProtocolReply.Status.ERROR_HANDLER_EXCEPTION;
                    if (th.getMessage() != null) {
                        bArr = StringUtil.getBytes(th.getMessage());
                    }
                }
                serverConnection.reply(id, status, Optional.ofNullable(bArr));
            });
        });
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public void unregisterHandler(String str) {
        this.handlers.unregister(str);
    }

    @Override // io.atomix.cluster.messaging.MessagingService
    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<MessagingService> start() {
        if (this.started.get()) {
            this.log.warn("Already running at local address: {}", this.advertisedAddress);
            return CompletableFuture.completedFuture(this);
        }
        CompletableFuture thenCompose = this.config.isTlsEnabled() ? loadServerSslContext().thenCompose(r3 -> {
            return loadClientSslContext();
        }) : CompletableFuture.completedFuture(null);
        initTransport();
        return thenCompose.thenCompose(r32 -> {
            return bootstrapServer();
        }).thenRun(() -> {
            this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("netty-messaging-timeout-"));
            this.localConnection = new LocalClientConnection(this.handlers);
            this.started.set(true);
            Logger logger = this.log;
            Object[] objArr = new Object[3];
            objArr[0] = this.bindingAddresses;
            objArr[1] = this.advertisedAddress;
            objArr[2] = this.config.isTlsEnabled() ? "TLS" : "plaintext";
            logger.info("Started messaging service bound to {}, advertising {}, and using {}", objArr);
        }).thenApply(r33 -> {
            return this;
        });
    }

    public CompletableFuture<Void> stop() {
        return this.started.compareAndSet(true, false) ? CompletableFuture.supplyAsync(() -> {
            boolean z = false;
            try {
                try {
                    this.serverChannel.close().sync();
                } catch (InterruptedException e) {
                    z = true;
                }
                Future shutdownGracefully = this.serverGroup.shutdownGracefully(this.config.getShutdownQuietPeriod().toMillis(), this.config.getShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS);
                Future shutdownGracefully2 = this.clientGroup.shutdownGracefully(this.config.getShutdownQuietPeriod().toMillis(), this.config.getShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS);
                try {
                    shutdownGracefully.sync();
                } catch (InterruptedException e2) {
                    z = true;
                }
                try {
                    shutdownGracefully2.sync();
                } catch (InterruptedException e3) {
                    z = true;
                }
                this.timeoutExecutor.shutdown();
                for (Map.Entry<Channel, RemoteClientConnection> entry : this.connections.entrySet()) {
                    entry.getKey().close();
                    entry.getValue().close();
                }
                Iterator<CompletableFuture> it = this.openFutures.iterator();
                while (it.hasNext()) {
                    it.next().completeExceptionally(new IllegalStateException("MessagingService has been closed."));
                }
                this.openFutures.clear();
                Logger logger = this.log;
                Object[] objArr = new Object[3];
                objArr[0] = this.bindingAddresses;
                objArr[1] = this.advertisedAddress;
                objArr[2] = this.config.isTlsEnabled() ? "TLS" : "plaintext";
                logger.info("Stopped messaging service bound to {}, advertising {}, and using {}", objArr);
                if (!z) {
                    return null;
                }
                Thread.currentThread().interrupt();
                return null;
            } catch (Throwable th) {
                Logger logger2 = this.log;
                Object[] objArr2 = new Object[3];
                objArr2[0] = this.bindingAddresses;
                objArr2[1] = this.advertisedAddress;
                objArr2[2] = this.config.isTlsEnabled() ? "TLS" : "plaintext";
                logger2.info("Stopped messaging service bound to {}, advertising {}, and using {}", objArr2);
                if (z) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
        }) : CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> loadClientSslContext() {
        try {
            this.clientSslContext = SslContextBuilder.forClient().trustManager(this.config.getCertificateChain()).sslProvider(SslProvider.OPENSSL_REFCNT).protocols(new String[]{TLS_PROTOCOL}).build();
            return CompletableFuture.completedFuture(null);
        } catch (Exception e) {
            return CompletableFuture.failedFuture(new MessagingException("Failed to start messaging service; invalid client TLS configuration", e));
        }
    }

    private CompletableFuture<Void> loadServerSslContext() {
        try {
            this.serverSslContext = SslContextBuilder.forServer(this.config.getCertificateChain(), this.config.getPrivateKey()).sslProvider(SslProvider.OPENSSL_REFCNT).protocols(new String[]{TLS_PROTOCOL}).build();
            return CompletableFuture.completedFuture(null);
        } catch (Exception e) {
            return CompletableFuture.failedFuture(new MessagingException("Failed to start messaging service; invalid server TLS configuration", e));
        }
    }

    private void initTransport() {
        if (Epoll.isAvailable()) {
            initEpollTransport();
        } else {
            initNioTransport();
        }
    }

    private void initEpollTransport() {
        this.clientGroup = new EpollEventLoopGroup(0, Threads.namedThreads("netty-messaging-event-epoll-client-%d", this.log));
        this.serverGroup = new EpollEventLoopGroup(0, Threads.namedThreads("netty-messaging-event-epoll-server-%d", this.log));
        this.serverChannelClass = EpollServerSocketChannel.class;
        this.clientChannelClass = EpollSocketChannel.class;
    }

    private void initNioTransport() {
        this.clientGroup = new NioEventLoopGroup(0, Threads.namedThreads("netty-messaging-event-nio-client-%d", this.log));
        this.serverGroup = new NioEventLoopGroup(0, Threads.namedThreads("netty-messaging-event-nio-server-%d", this.log));
        this.serverChannelClass = NioServerSocketChannel.class;
        this.clientChannelClass = NioSocketChannel.class;
    }

    private <T> CompletableFuture<T> executeOnPooledConnection(Address address, String str, Function<ClientConnection, CompletableFuture<T>> function, Executor executor) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        executeOnPooledConnection(address, str, function, executor, completableFuture);
        return completableFuture;
    }

    private <T> void executeOnPooledConnection(Address address, String str, Function<ClientConnection, CompletableFuture<T>> function, Executor executor, CompletableFuture<T> completableFuture) {
        if (address.equals(this.advertisedAddress)) {
            function.apply(this.localConnection).whenComplete((BiConsumer) (obj, th) -> {
                if (th == null) {
                    executor.execute(() -> {
                        completableFuture.complete(obj);
                    });
                } else {
                    executor.execute(() -> {
                        completableFuture.completeExceptionally(th);
                    });
                }
            });
        } else {
            this.openFutures.add(completableFuture);
            this.channelPool.getChannel(address, str).whenComplete((channel, th2) -> {
                if (th2 != null) {
                    executor.execute(() -> {
                        completableFuture.completeExceptionally(th2);
                        this.openFutures.remove(completableFuture);
                    });
                    return;
                }
                completableFuture.whenComplete((obj2, th2) -> {
                    if (th2 instanceof TimeoutException) {
                        channel.close();
                    }
                });
                RemoteClientConnection orCreateClientConnection = getOrCreateClientConnection(channel);
                ((CompletableFuture) function.apply(orCreateClientConnection)).whenComplete((obj3, th3) -> {
                    if (th3 == null) {
                        executor.execute(() -> {
                            completableFuture.complete(obj3);
                            this.openFutures.remove(completableFuture);
                        });
                        return;
                    }
                    Throwable rootCause = Throwables.getRootCause(th3);
                    if (!(rootCause instanceof TimeoutException) && !(rootCause instanceof MessagingException)) {
                        channel.close().addListener(future -> {
                            this.log.debug("Closing connection to {}", channel.remoteAddress());
                            orCreateClientConnection.close();
                            this.connections.remove(channel);
                        });
                    }
                    executor.execute(() -> {
                        completableFuture.completeExceptionally(th3);
                        this.openFutures.remove(completableFuture);
                    });
                });
            });
        }
    }

    private <T> CompletableFuture<T> executeOnTransientConnection(Address address, Function<ClientConnection, CompletableFuture<T>> function, Executor executor) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        if (address.equals(this.advertisedAddress)) {
            function.apply(this.localConnection).whenComplete((BiConsumer) (obj, th) -> {
                if (th == null) {
                    executor.execute(() -> {
                        completableFuture.complete(obj);
                    });
                } else {
                    executor.execute(() -> {
                        completableFuture.completeExceptionally(th);
                    });
                }
            });
            return completableFuture;
        }
        openChannel(address).whenComplete((channel, th2) -> {
            if (th2 == null) {
                ((CompletableFuture) function.apply(getOrCreateClientConnection(channel))).whenComplete((obj2, th2) -> {
                    if (th2 == null) {
                        executor.execute(() -> {
                            completableFuture.complete(obj2);
                        });
                    } else {
                        executor.execute(() -> {
                            completableFuture.completeExceptionally(th2);
                        });
                    }
                    channel.close();
                });
            } else {
                executor.execute(() -> {
                    completableFuture.completeExceptionally(th2);
                });
            }
        });
        return completableFuture;
    }

    private RemoteClientConnection getOrCreateClientConnection(Channel channel) {
        RemoteClientConnection remoteClientConnection = this.connections.get(channel);
        if (remoteClientConnection == null) {
            remoteClientConnection = this.connections.computeIfAbsent(channel, channel2 -> {
                return new RemoteClientConnection(this.messagingMetrics, channel2);
            });
            channel.closeFuture().addListener(future -> {
                RemoteClientConnection remove = this.connections.remove(channel);
                if (remove != null) {
                    remove.close();
                }
            });
        }
        return remoteClientConnection;
    }

    private CompletableFuture<Channel> openChannel(Address address) {
        return bootstrapClient(address);
    }

    private CompletableFuture<Channel> bootstrapClient(Address address) {
        OrderedFuture orderedFuture = new OrderedFuture();
        InetAddress address2 = address.address(true);
        if (address2 == null) {
            orderedFuture.completeExceptionally(new ConnectException("Failed to bootstrap client (address " + address.toString() + " cannot be resolved)"));
            return orderedFuture;
        }
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(327680, 655360));
        bootstrap.option(ChannelOption.SO_RCVBUF, 1048576);
        bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
        bootstrap.group(this.clientGroup);
        bootstrap.channel(this.clientChannelClass);
        bootstrap.remoteAddress(address2, address.port());
        bootstrap.handler(new BasicClientChannelInitializer(orderedFuture));
        Channel channel = bootstrap.connect().addListener(future -> {
            if (future.isSuccess()) {
                return;
            }
            orderedFuture.completeExceptionally(new ConnectException(String.format("Failed to connect channel for address %s (resolved: %s) : %s", address, address.address(), future.cause())));
        }).channel();
        channel.closeFuture().addListener(future2 -> {
            orderedFuture.completeExceptionally(new ConnectException(String.format("Channel %s for address %s was closed unexpectedly before the request was handled", channel, address)));
        });
        return orderedFuture;
    }

    private CompletableFuture<Void> bootstrapServer() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
        serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8192, 32768));
        serverBootstrap.childOption(ChannelOption.SO_RCVBUF, 1048576);
        serverBootstrap.childOption(ChannelOption.SO_SNDBUF, 1048576);
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        serverBootstrap.group(this.serverGroup, this.clientGroup);
        serverBootstrap.channel(this.serverChannelClass);
        serverBootstrap.childHandler(new BasicServerChannelInitializer());
        return bind(serverBootstrap);
    }

    private CompletableFuture<Void> bind(ServerBootstrap serverBootstrap) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        bind(serverBootstrap, this.bindingAddresses.iterator(), completableFuture);
        return completableFuture;
    }

    private void bind(ServerBootstrap serverBootstrap, Iterator<Address> it, CompletableFuture<Void> completableFuture) {
        if (!it.hasNext()) {
            completableFuture.complete(null);
        } else {
            Address next = it.next();
            serverBootstrap.bind(next.host(), next.port()).addListener(channelFuture -> {
                if (!channelFuture.isSuccess()) {
                    this.log.warn("Failed to bind TCP server to port {} due to {}", next, channelFuture.cause());
                    completableFuture.completeExceptionally(channelFuture.cause());
                } else {
                    this.log.info("TCP server listening for connections on {}", next);
                    this.serverChannel = channelFuture.channel();
                    bind(serverBootstrap, it, completableFuture);
                }
            });
        }
    }
}
