package org.apache.ignite.internal.network.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite/internal/network/netty/ConnectionManager.class */
public class ConnectionManager {
    private static final IgniteLogger LOG;
    public static final byte DIRECT_PROTOCOL_VERSION = 1;
    private final Bootstrap clientBootstrap;
    private final NettyServer server;
    private final MessageSerializationRegistry serializationRegistry;
    private final String consistentId;
    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
    private final Map<String, NettySender> channels = new ConcurrentHashMap();
    private final Map<SocketAddress, NettyClient> clients = new ConcurrentHashMap();
    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    public ConnectionManager(int i, MessageSerializationRegistry messageSerializationRegistry, String str, Supplier<HandshakeManager> supplier, Supplier<HandshakeManager> supplier2) {
        this.serializationRegistry = messageSerializationRegistry;
        this.consistentId = str;
        this.clientHandshakeManagerFactory = supplier2;
        this.server = new NettyServer(i, supplier, this::onNewIncomingChannel, this::onMessage, this.serializationRegistry);
        this.clientBootstrap = createClientBootstrap(this.clientWorkerGroup, this.serializationRegistry);
    }

    public void start() throws IgniteInternalException {
        try {
            if (this.started.getAndSet(true)) {
                throw new IgniteInternalException("Attempted to start an already started connection manager");
            }
            if (this.stopped.get()) {
                throw new IgniteInternalException("Attempted to start an already stopped connection manager");
            }
            this.server.start().get(3L, TimeUnit.SECONDS);
            LOG.info("Connection created [address=" + this.server.address() + "]", new Object[0]);
        } catch (InterruptedException e) {
            throw new IgniteInternalException("Interrupted while starting the connection manager", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
        } catch (TimeoutException e3) {
            throw new IgniteInternalException("Timeout while waiting for the connection manager to start", e3);
        }
    }

    public SocketAddress getLocalAddress() {
        return this.server.address();
    }

    public CompletableFuture<NettySender> channel(@Nullable String str, SocketAddress socketAddress) {
        NettySender compute;
        if (str != null && (compute = this.channels.compute(str, (str2, nettySender) -> {
            if (nettySender == null || !nettySender.isOpen()) {
                return null;
            }
            return nettySender;
        })) != null) {
            return CompletableFuture.completedFuture(compute);
        }
        CompletableFuture<NettySender> sender = this.clients.compute(socketAddress, (socketAddress2, nettyClient) -> {
            return (nettyClient == null || nettyClient.failedToConnect() || nettyClient.isDisconnected()) ? connect(socketAddress2) : nettyClient;
        }).sender();
        if ($assertionsDisabled || sender != null) {
            return sender;
        }
        throw new AssertionError();
    }

    private void onMessage(SocketAddress socketAddress, NetworkMessage networkMessage) {
        this.listeners.forEach(biConsumer -> {
            biConsumer.accept(socketAddress, networkMessage);
        });
    }

    private void onNewIncomingChannel(NettySender nettySender) {
        this.channels.put(nettySender.consistentId(), nettySender);
    }

    private NettyClient connect(SocketAddress socketAddress) {
        NettyClient nettyClient = new NettyClient(socketAddress, this.serializationRegistry, this.clientHandshakeManagerFactory.get(), this::onMessage);
        nettyClient.start(this.clientBootstrap).whenComplete((nettySender, th) -> {
            if (th == null) {
                this.channels.put(nettySender.consistentId(), nettySender);
            } else {
                this.clients.remove(socketAddress);
            }
        });
        return nettyClient;
    }

    public void addListener(BiConsumer<SocketAddress, NetworkMessage> biConsumer) {
        this.listeners.add(biConsumer);
    }

    public void stop() {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) Stream.concat(this.clients.values().stream().map((v0) -> {
                return v0.stop();
            }), Stream.of(this.server.stop())).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
            this.clientWorkerGroup.shutdownGracefully(0L, 15L, TimeUnit.SECONDS).sync();
        } catch (Exception e) {
            LOG.warn("Failed to stop the ConnectionManager: {}", new Object[]{e.getMessage()});
        }
    }

    @TestOnly
    public NettyServer server() {
        return this.server;
    }

    @TestOnly
    public String consistentId() {
        return this.consistentId;
    }

    @TestOnly
    public Collection<NettyClient> clients() {
        return Collections.unmodifiableCollection(this.clients.values());
    }

    @TestOnly
    public Map<String, NettySender> channels() {
        return Collections.unmodifiableMap(this.channels);
    }

    public static Bootstrap createClientBootstrap(EventLoopGroup eventLoopGroup, MessageSerializationRegistry messageSerializationRegistry) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_LINGER, 0).option(ChannelOption.TCP_NODELAY, true);
        return bootstrap;
    }

    static {
        $assertionsDisabled = !ConnectionManager.class.desiredAssertionStatus();
        LOG = IgniteLogger.forClass(ConnectionManager.class);
    }
}
