package org.apache.pekko.remote.transport.netty;

import com.typesafe.config.Config;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.Address$;
import org.apache.pekko.actor.ExtendedActorSystem;
import org.apache.pekko.dispatch.Dispatchers;
import org.apache.pekko.event.LogSource$;
import org.apache.pekko.event.Logging$;
import org.apache.pekko.event.MarkerLoggingAdapter;
import org.apache.pekko.remote.RARP;
import org.apache.pekko.remote.RARP$;
import org.apache.pekko.remote.transport.AssociationHandle;
import org.apache.pekko.remote.transport.Transport;
import org.apache.pekko.util.OptionVal$;
import org.apache.pekko.util.OptionVal$Some$;
import org.jboss.netty.bootstrap.Bootstrap;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.util.HashedWheelTimer;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.control.NonFatal$;

/* compiled from: NettyTransport.scala */
/* loaded from: input_file:org/apache/pekko/remote/transport/netty/NettyTransport.class */
public class NettyTransport implements Transport {
    private final NettyTransportSettings settings;
    private final ExtendedActorSystem system;
    private final ExecutionContext executionContext;
    private final String schemeIdentifier;
    private volatile Address boundTo;
    private volatile Channel serverChannel;
    public final MarkerLoggingAdapter org$apache$pekko$remote$transport$netty$NettyTransport$$log;
    private final ConcurrentHashMap udpConnectionTable;
    private final DefaultChannelGroup channelGroup;
    private final ChannelFactory clientChannelFactory;
    private final ChannelFactory serverChannelFactory;
    public final Promise<Transport.AssociationEventListener> org$apache$pekko$remote$transport$netty$NettyTransport$$associationListenerPromise;
    private final SSLEngineProvider sslEngineProvider;
    private final Bootstrap inboundBootstrap;

    public static int FrameLengthFieldLength() {
        return NettyTransport$.MODULE$.FrameLengthFieldLength();
    }

    public static Option<Address> addressFromSocketAddress(SocketAddress socketAddress, String str, String str2, Option<String> option) {
        return NettyTransport$.MODULE$.addressFromSocketAddress(socketAddress, str, str2, option);
    }

    public static Option<Address> addressFromSocketAddress(SocketAddress socketAddress, String str, String str2, Option<String> option, Option<Object> option2) {
        return NettyTransport$.MODULE$.addressFromSocketAddress(socketAddress, str, str2, option, option2);
    }

    public static void gracefulClose(Channel channel, ExecutionContext executionContext) {
        NettyTransport$.MODULE$.gracefulClose(channel, executionContext);
    }

    public static AtomicInteger uniqueIdCounter() {
        return NettyTransport$.MODULE$.uniqueIdCounter();
    }

    public NettyTransport(NettyTransportSettings nettyTransportSettings, ExtendedActorSystem extendedActorSystem) {
        SSLEngineProvider sSLEngineProvider;
        this.settings = nettyTransportSettings;
        this.system = extendedActorSystem;
        Option orElse = nettyTransportSettings.UseDispatcherForIo().orElse(() -> {
            return $init$$$anonfun$3(r2);
        });
        Dispatchers dispatchers = extendedActorSystem.dispatchers();
        this.executionContext = (ExecutionContext) orElse.map(str -> {
            return dispatchers.lookup(str);
        }).getOrElse(() -> {
            return $init$$$anonfun$5(r2);
        });
        this.schemeIdentifier = new StringBuilder(3).append(nettyTransportSettings.EnableSsl() ? "ssl." : "").append("tcp").toString();
        this.org$apache$pekko$remote$transport$netty$NettyTransport$$log = Logging$.MODULE$.withMarker(extendedActorSystem, NettyTransport.class, LogSource$.MODULE$.fromAnyClass());
        this.udpConnectionTable = new ConcurrentHashMap();
        this.channelGroup = new DefaultChannelGroup(new StringBuilder(42).append("pekko-netty-transport-driver-channelgroup-").append(NettyTransport$.MODULE$.uniqueIdCounter().getAndIncrement()).toString());
        this.clientChannelFactory = new NioClientSocketChannelFactory(createExecutorService(), 1, new NioWorkerPool(createExecutorService(), nettyTransportSettings.ClientSocketWorkerPoolSize()), new HashedWheelTimer(extendedActorSystem.threadFactory()));
        this.serverChannelFactory = new NioServerSocketChannelFactory(createExecutorService(), createExecutorService(), nettyTransportSettings.ServerSocketWorkerPoolSize());
        this.org$apache$pekko$remote$transport$netty$NettyTransport$$associationListenerPromise = Promise$.MODULE$.apply();
        if (nettyTransportSettings.EnableSsl()) {
            sSLEngineProvider = (SSLEngineProvider) OptionVal$Some$.MODULE$.apply(extendedActorSystem.dynamicAccess().createInstanceFor(nettyTransportSettings.SSLEngineProviderClassName(), (Seq) package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Tuple2$.MODULE$.apply(ActorSystem.class, extendedActorSystem)})), ClassTag$.MODULE$.apply(SSLEngineProvider.class)).recover(new NettyTransport$$anon$4(this)).get());
        } else {
            OptionVal$.MODULE$.None();
            sSLEngineProvider = null;
        }
        this.sslEngineProvider = sSLEngineProvider;
        this.inboundBootstrap = setupBootstrap(new ServerBootstrap(this.serverChannelFactory), new ChannelPipelineFactory(this) { // from class: org.apache.pekko.remote.transport.netty.NettyTransport$$anon$5
            private final /* synthetic */ NettyTransport $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public ChannelPipeline getPipeline() {
                DefaultChannelPipeline org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline = this.$outer.org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline();
                if (this.$outer.settings().EnableSsl()) {
                    org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline.addFirst("SslHandler", this.$outer.org$apache$pekko$remote$transport$netty$NettyTransport$$sslHandler(false));
                }
                org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline.addLast("ServerHandler", new TcpServerHandler(this.$outer, this.$outer.org$apache$pekko$remote$transport$netty$NettyTransport$$associationListenerPromise.future(), this.$outer.org$apache$pekko$remote$transport$netty$NettyTransport$$log));
                return org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline;
            }
        });
    }

    @Override // org.apache.pekko.remote.transport.Transport
    public /* bridge */ /* synthetic */ Future managementCommand(Object obj) {
        Future managementCommand;
        managementCommand = managementCommand(obj);
        return managementCommand;
    }

    public NettyTransportSettings settings() {
        return this.settings;
    }

    public ExtendedActorSystem system() {
        return this.system;
    }

    public NettyTransport(ExtendedActorSystem extendedActorSystem, Config config) {
        this(new NettyTransportSettings(config), extendedActorSystem);
    }

    public ExecutionContext executionContext() {
        return this.executionContext;
    }

    @Override // org.apache.pekko.remote.transport.Transport
    public String schemeIdentifier() {
        return this.schemeIdentifier;
    }

    @Override // org.apache.pekko.remote.transport.Transport
    public int maximumPayloadBytes() {
        return settings().MaxFrameSize();
    }

    public final ConcurrentHashMap<SocketAddress, AssociationHandle.HandleEventListener> udpConnectionTable() {
        return this.udpConnectionTable;
    }

    private Executor createExecutorService() {
        Option<String> UseDispatcherForIo = settings().UseDispatcherForIo();
        Dispatchers dispatchers = system().dispatchers();
        return (Executor) UseDispatcherForIo.map(str -> {
            return dispatchers.lookup(str);
        }).getOrElse(this::createExecutorService$$anonfun$2);
    }

    public DefaultChannelGroup channelGroup() {
        return this.channelGroup;
    }

    public DefaultChannelPipeline org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline() {
        DefaultChannelPipeline defaultChannelPipeline = new DefaultChannelPipeline();
        defaultChannelPipeline.addLast("FrameDecoder", new LengthFieldBasedFrameDecoder(maximumPayloadBytes(), 0, NettyTransport$.MODULE$.FrameLengthFieldLength(), 0, NettyTransport$.MODULE$.FrameLengthFieldLength(), true));
        defaultChannelPipeline.addLast("FrameEncoder", new LengthFieldPrepender(NettyTransport$.MODULE$.FrameLengthFieldLength()));
        return defaultChannelPipeline;
    }

    public SslHandler org$apache$pekko$remote$transport$netty$NettyTransport$$sslHandler(boolean z) {
        SSLEngineProvider sSLEngineProvider = (SSLEngineProvider) OptionVal$Some$.MODULE$.unapply(this.sslEngineProvider);
        if (OptionVal$.MODULE$.isEmpty$extension(sSLEngineProvider)) {
            throw new IllegalStateException("Expected enable-ssl=on");
        }
        SslHandler apply = NettySSLSupport$.MODULE$.apply((SSLEngineProvider) OptionVal$.MODULE$.get$extension(sSLEngineProvider), z);
        apply.setCloseOnSSLException(true);
        return apply;
    }

    private ChannelPipelineFactory clientPipelineFactory(final Address address) {
        return new ChannelPipelineFactory(address, this) { // from class: org.apache.pekko.remote.transport.netty.NettyTransport$$anon$6
            private final Address remoteAddress$1;
            private final /* synthetic */ NettyTransport $outer;

            {
                this.remoteAddress$1 = address;
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public ChannelPipeline getPipeline() {
                DefaultChannelPipeline org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline = this.$outer.org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline();
                if (this.$outer.settings().EnableSsl()) {
                    org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline.addFirst("SslHandler", this.$outer.org$apache$pekko$remote$transport$netty$NettyTransport$$sslHandler(true));
                }
                org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline.addLast("clienthandler", new TcpClientHandler(this.$outer, this.remoteAddress$1, this.$outer.org$apache$pekko$remote$transport$netty$NettyTransport$$log));
                return org$apache$pekko$remote$transport$netty$NettyTransport$$newPipeline;
            }
        };
    }

    private <B extends Bootstrap> B setupBootstrap(B b, ChannelPipelineFactory channelPipelineFactory) {
        b.setPipelineFactory(channelPipelineFactory);
        b.setOption("backlog", BoxesRunTime.boxToInteger(settings().Backlog()));
        b.setOption("child.tcpNoDelay", BoxesRunTime.boxToBoolean(settings().TcpNodelay()));
        b.setOption("child.keepAlive", BoxesRunTime.boxToBoolean(settings().TcpKeepalive()));
        b.setOption("reuseAddress", BoxesRunTime.boxToBoolean(settings().TcpReuseAddr()));
        settings().ReceiveBufferSize().foreach(i -> {
            b.setOption("receiveBufferSize", BoxesRunTime.boxToInteger(i));
        });
        settings().SendBufferSize().foreach(i2 -> {
            b.setOption("sendBufferSize", BoxesRunTime.boxToInteger(i2));
        });
        settings().WriteBufferHighWaterMark().foreach(i3 -> {
            b.setOption("writeBufferHighWaterMark", BoxesRunTime.boxToInteger(i3));
        });
        settings().WriteBufferLowWaterMark().foreach(i4 -> {
            b.setOption("writeBufferLowWaterMark", BoxesRunTime.boxToInteger(i4));
        });
        return b;
    }

    private ClientBootstrap outboundBootstrap(Address address) {
        ClientBootstrap clientBootstrap = setupBootstrap(new ClientBootstrap(this.clientChannelFactory), clientPipelineFactory(address));
        clientBootstrap.setOption("connectTimeoutMillis", BoxesRunTime.boxToLong(settings().ConnectionTimeout().toMillis()));
        clientBootstrap.setOption("tcpNoDelay", BoxesRunTime.boxToBoolean(settings().TcpNodelay()));
        clientBootstrap.setOption("keepAlive", BoxesRunTime.boxToBoolean(settings().TcpKeepalive()));
        settings().ReceiveBufferSize().foreach(i -> {
            clientBootstrap.setOption("receiveBufferSize", BoxesRunTime.boxToInteger(i));
        });
        settings().SendBufferSize().foreach(i2 -> {
            clientBootstrap.setOption("sendBufferSize", BoxesRunTime.boxToInteger(i2));
        });
        settings().WriteBufferHighWaterMark().foreach(i3 -> {
            clientBootstrap.setOption("writeBufferHighWaterMark", BoxesRunTime.boxToInteger(i3));
        });
        settings().WriteBufferLowWaterMark().foreach(i4 -> {
            clientBootstrap.setOption("writeBufferLowWaterMark", BoxesRunTime.boxToInteger(i4));
        });
        return clientBootstrap;
    }

    @Override // org.apache.pekko.remote.transport.Transport
    public boolean isResponsibleFor(Address address) {
        return true;
    }

    public Future<InetSocketAddress> addressToSocketAddress(Address address) {
        if (address != null) {
            Address unapply = Address$.MODULE$.unapply(address);
            unapply._1();
            unapply._2();
            Some _3 = unapply._3();
            Some _4 = unapply._4();
            if (_3 instanceof Some) {
                String str = (String) _3.value();
                if (_4 instanceof Some) {
                    int unboxToInt = BoxesRunTime.unboxToInt(_4.value());
                    return Future$.MODULE$.apply(() -> {
                        return addressToSocketAddress$$anonfun$1(r1, r2);
                    }, executionContext());
                }
            }
        }
        return Future$.MODULE$.failed(new IllegalArgumentException(new StringBuilder(53).append("Address [").append(address).append("] does not contain host or port information.").toString()));
    }

    @Override // org.apache.pekko.remote.transport.Transport
    public Future<Tuple2<Address, Promise<Transport.AssociationEventListener>>> listen() {
        return addressToSocketAddress(Address$.MODULE$.apply("", "", settings().BindHostname(), settings().BindPortSelector())).map(inetSocketAddress -> {
            Channel bind;
            try {
                ServerBootstrap serverBootstrap = this.inboundBootstrap;
                if (serverBootstrap instanceof ServerBootstrap) {
                    bind = serverBootstrap.bind(inetSocketAddress);
                } else {
                    if (!(serverBootstrap instanceof ConnectionlessBootstrap)) {
                        throw new IllegalStateException();
                    }
                    bind = ((ConnectionlessBootstrap) serverBootstrap).bind(inetSocketAddress);
                }
                Channel channel = bind;
                channel.setReadable(false);
                channelGroup().add(channel);
                this.serverChannel = channel;
                Some addressFromSocketAddress = NettyTransport$.MODULE$.addressFromSocketAddress(channel.getLocalAddress(), schemeIdentifier(), system().name(), Some$.MODULE$.apply(settings().Hostname()), settings().PortSelector() == 0 ? None$.MODULE$ : Some$.MODULE$.apply(BoxesRunTime.boxToInteger(settings().PortSelector())));
                if (!(addressFromSocketAddress instanceof Some)) {
                    if (None$.MODULE$.equals(addressFromSocketAddress)) {
                        throw new NettyTransportException(new StringBuilder(29).append("Unknown local address type [").append(channel.getLocalAddress().getClass().getName()).append("]").toString());
                    }
                    throw new MatchError(addressFromSocketAddress);
                }
                Address address = (Address) addressFromSocketAddress.value();
                Some addressFromSocketAddress2 = NettyTransport$.MODULE$.addressFromSocketAddress(channel.getLocalAddress(), schemeIdentifier(), system().name(), None$.MODULE$, None$.MODULE$);
                if (addressFromSocketAddress2 instanceof Some) {
                    this.boundTo = (Address) addressFromSocketAddress2.value();
                    this.org$apache$pekko$remote$transport$netty$NettyTransport$$associationListenerPromise.future().foreach(associationEventListener -> {
                        return channel.setReadable(true);
                    }, executionContext());
                    return Tuple2$.MODULE$.apply(address, this.org$apache$pekko$remote$transport$netty$NettyTransport$$associationListenerPromise);
                }
                if (None$.MODULE$.equals(addressFromSocketAddress2)) {
                    throw new NettyTransportException(new StringBuilder(29).append("Unknown local address type [").append(channel.getLocalAddress().getClass().getName()).append("]").toString());
                }
                throw new MatchError(addressFromSocketAddress2);
            } catch (Throwable th) {
                if (th != null) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (!unapply.isEmpty()) {
                        Throwable th2 = (Throwable) unapply.get();
                        this.org$apache$pekko$remote$transport$netty$NettyTransport$$log.error("failed to bind to {}, shutting down Netty transport", inetSocketAddress);
                        try {
                            shutdown();
                        } catch (Throwable th3) {
                            if (th3 != null) {
                                Option unapply2 = NonFatal$.MODULE$.unapply(th3);
                                if (!unapply2.isEmpty()) {
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                }
                            }
                            throw th3;
                        }
                        throw th2;
                    }
                }
                throw th;
            }
        }, executionContext());
    }

    public Address boundAddress() {
        return this.boundTo;
    }

    @Override // org.apache.pekko.remote.transport.Transport
    public Future<AssociationHandle> associate(Address address) {
        if (!this.serverChannel.isBound()) {
            return Future$.MODULE$.failed(new NettyTransportException("Transport is not bound"));
        }
        ClientBootstrap outboundBootstrap = outboundBootstrap(address);
        return addressToSocketAddress(address).flatMap(inetSocketAddress -> {
            return NettyFutureBridge$.MODULE$.apply(outboundBootstrap.connect(inetSocketAddress)).map(channel -> {
                if (settings().EnableSsl()) {
                    scala.concurrent.package$.MODULE$.blocking(() -> {
                        return associate$$anonfun$1$$anonfun$1$$anonfun$1(r1);
                    });
                }
                channel.setReadable(false);
                return channel;
            }, executionContext()).flatMap(channel2 -> {
                return channel2.getPipeline().get(ClientHandler.class).statusFuture().map(associationHandle -> {
                    return associationHandle;
                }, executionContext());
            }, executionContext());
        }, executionContext()).recover(new NettyTransport$$anon$7(), executionContext());
    }

    @Override // org.apache.pekko.remote.transport.Transport
    public Future<Object> shutdown() {
        return always$1(channelGroup().unbind()).flatMap(obj -> {
            return shutdown$$anonfun$1(BoxesRunTime.unboxToBoolean(obj));
        }, executionContext());
    }

    private static final Option $init$$$anonfun$3(ExtendedActorSystem extendedActorSystem) {
        String Dispatcher = ((RARP) RARP$.MODULE$.apply((ActorSystem) extendedActorSystem)).provider().remoteSettings().Dispatcher();
        return "".equals(Dispatcher) ? None$.MODULE$ : Some$.MODULE$.apply(Dispatcher);
    }

    private static final ExecutionContextExecutor $init$$$anonfun$5(ExtendedActorSystem extendedActorSystem) {
        return extendedActorSystem.dispatcher();
    }

    private final Executor createExecutorService$$anonfun$2() {
        return Executors.newCachedThreadPool(system().threadFactory());
    }

    private static final InetSocketAddress addressToSocketAddress$$anonfun$1$$anonfun$1(String str, int i) {
        return new InetSocketAddress(InetAddress.getByName(str), i);
    }

    private static final InetSocketAddress addressToSocketAddress$$anonfun$1(String str, int i) {
        return (InetSocketAddress) scala.concurrent.package$.MODULE$.blocking(() -> {
            return addressToSocketAddress$$anonfun$1$$anonfun$1(r1, r2);
        });
    }

    private static final ChannelFuture associate$$anonfun$1$$anonfun$1$$anonfun$1(Channel channel) {
        return channel.getPipeline().get(SslHandler.class).handshake().awaitUninterruptibly();
    }

    private final Future always$1(ChannelGroupFuture channelGroupFuture) {
        return NettyFutureBridge$.MODULE$.apply(channelGroupFuture).map(channelGroup -> {
            return true;
        }, executionContext()).recover(new NettyTransport$$anon$8(), executionContext());
    }

    private final /* synthetic */ boolean shutdown$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1(boolean z, boolean z2, boolean z3, boolean z4) {
        if (settings().UseDispatcherForIo().isDefined()) {
            this.clientChannelFactory.shutdown();
            this.serverChannelFactory.shutdown();
        } else {
            this.clientChannelFactory.releaseExternalResources();
            this.serverChannelFactory.releaseExternalResources();
        }
        return z2 && z && z3 && z4;
    }

    private final /* synthetic */ Future shutdown$$anonfun$1$$anonfun$1$$anonfun$1(boolean z, boolean z2, boolean z3) {
        return always$1(channelGroup().close()).map(obj -> {
            return shutdown$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1(z, z2, z3, BoxesRunTime.unboxToBoolean(obj));
        }, executionContext());
    }

    private final /* synthetic */ Future shutdown$$anonfun$1$$anonfun$1(boolean z, boolean z2) {
        return always$1(channelGroup().disconnect()).flatMap(obj -> {
            return shutdown$$anonfun$1$$anonfun$1$$anonfun$1(z, z2, BoxesRunTime.unboxToBoolean(obj));
        }, executionContext());
    }

    private final /* synthetic */ Future shutdown$$anonfun$1(boolean z) {
        return always$1(channelGroup().write(ChannelBuffers.buffer(0))).flatMap(obj -> {
            return shutdown$$anonfun$1$$anonfun$1(z, BoxesRunTime.unboxToBoolean(obj));
        }, executionContext());
    }
}
