package io.scalecube.transport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.scalecube.Preconditions;
import java.net.BindException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/scalecube/transport/TransportImpl.class */
public final class TransportImpl implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransportImpl.class);
    private static final CompletableFuture<Void> COMPLETED_PROMISE = CompletableFuture.completedFuture(null);
    private final TransportConfig config;
    private final BootstrapFactory bootstrapFactory;
    private final MessageToByteEncoder<Message> serializerHandler;
    private final MessageToMessageDecoder<ByteBuf> deserializerHandler;
    private final MessageHandler messageHandler;
    private NetworkEmulator networkEmulator;
    private NetworkEmulatorHandler networkEmulatorHandler;
    private Address address;
    private ServerChannel serverChannel;
    private final FluxProcessor<Message, Message> incomingMessagesSubject = DirectProcessor.create().serialize();
    private final FluxSink<Message> messageSink = this.incomingMessagesSubject.sink();
    private final Map<Address, ChannelFuture> outgoingChannels = new ConcurrentHashMap();
    private final IncomingChannelInitializer incomingChannelInitializer = new IncomingChannelInitializer();
    private final ExceptionHandler exceptionHandler = new ExceptionHandler();
    private volatile boolean stopped = false;

    /* JADX INFO: Access modifiers changed from: private */
    @ChannelHandler.Sharable
    /* loaded from: input_file:io/scalecube/transport/TransportImpl$IncomingChannelInitializer.class */
    public final class IncomingChannelInitializer extends ChannelInitializer {
        private IncomingChannelInitializer() {
        }

        @Override // io.netty.channel.ChannelInitializer
        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ProtobufVarint32FrameDecoder());
            pipeline.addLast(TransportImpl.this.deserializerHandler);
            pipeline.addLast(TransportImpl.this.messageHandler);
            pipeline.addLast(TransportImpl.this.exceptionHandler);
        }
    }

    @ChannelHandler.Sharable
    /* loaded from: input_file:io/scalecube/transport/TransportImpl$OutgoingChannelInitializer.class */
    private final class OutgoingChannelInitializer extends ChannelInitializer {
        private final Address address;

        public OutgoingChannelInitializer(Address address) {
            this.address = address;
        }

        @Override // io.netty.channel.ChannelInitializer
        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelDuplexHandler() { // from class: io.scalecube.transport.TransportImpl.OutgoingChannelInitializer.1
                @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
                public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    TransportImpl.LOGGER.debug("Disconnected from: {} {}", OutgoingChannelInitializer.this.address, channelHandlerContext.channel());
                    TransportImpl.this.outgoingChannels.remove(OutgoingChannelInitializer.this.address);
                    super.channelInactive(channelHandlerContext);
                }
            });
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(TransportImpl.this.serializerHandler);
            if (TransportImpl.this.networkEmulatorHandler != null) {
                pipeline.addLast(TransportImpl.this.networkEmulatorHandler);
            }
            pipeline.addLast(TransportImpl.this.exceptionHandler);
        }
    }

    public TransportImpl(TransportConfig transportConfig) {
        Preconditions.checkArgument(transportConfig != null);
        this.config = transportConfig;
        this.serializerHandler = new MessageSerializerHandler();
        this.deserializerHandler = new MessageDeserializerHandler();
        this.messageHandler = new MessageHandler(this.messageSink);
        this.bootstrapFactory = new BootstrapFactory(transportConfig);
    }

    public CompletableFuture<Transport> bind0() {
        ServerBootstrap childHandler = this.bootstrapFactory.serverBootstrap().childHandler(this.incomingChannelInitializer);
        InetAddress localIpAddress = Addressing.getLocalIpAddress(this.config.getListenAddress(), this.config.getListenInterface(), this.config.isPreferIPv6());
        int port = this.config.getPort();
        return bind0(childHandler, localIpAddress, port, port + this.config.getPortCount());
    }

    private CompletableFuture<Transport> bind0(ServerBootstrap serverBootstrap, InetAddress inetAddress, int i, int i2) {
        CompletableFuture<Transport> completableFuture = new CompletableFuture<>();
        if (i < 1100 || i > 65535) {
            completableFuture.completeExceptionally(new IllegalArgumentException("Invalid port number: " + i));
            return completableFuture;
        }
        if (i > i2) {
            completableFuture.completeExceptionally(new NoSuchElementException("Could not find an available port from: " + i + " to: " + i2));
            return completableFuture;
        }
        this.address = Address.create(inetAddress.getHostAddress(), i);
        serverBootstrap.bind(inetAddress, this.address.port()).addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
            if (channelFuture.isSuccess()) {
                this.serverChannel = (ServerChannel) channelFuture.channel();
                this.networkEmulator = new NetworkEmulator(this.address, this.config.isUseNetworkEmulator());
                this.networkEmulatorHandler = this.config.isUseNetworkEmulator() ? new NetworkEmulatorHandler(this.networkEmulator) : null;
                LOGGER.info("Bound to: {}", this.address);
                completableFuture.complete(this);
                return;
            }
            Throwable cause = channelFuture.cause();
            if (!this.config.isPortAutoIncrement() || !isAddressAlreadyInUseException(cause)) {
                LOGGER.error("Failed to bind to: {}, cause: {}", this.address, cause);
                completableFuture.completeExceptionally(cause);
            } else {
                LOGGER.warn("Can't bind to address {}, try again on different port [cause={}]", this.address, cause.toString());
                CompletableFuture<Transport> bind0 = bind0(serverBootstrap, inetAddress, i + 1, i2);
                completableFuture.getClass();
                bind0.thenAccept((v1) -> {
                    r1.complete(v1);
                });
            }
        });
        return completableFuture;
    }

    private boolean isAddressAlreadyInUseException(Throwable th) {
        return (th instanceof BindException) || (th.getMessage() != null && th.getMessage().contains("Address already in use"));
    }

    @Override // io.scalecube.transport.Transport
    public Address address() {
        return this.address;
    }

    @Override // io.scalecube.transport.Transport
    public boolean isStopped() {
        return this.stopped;
    }

    @Override // io.scalecube.transport.Transport
    public NetworkEmulator networkEmulator() {
        return this.networkEmulator;
    }

    @Override // io.scalecube.transport.Transport
    public final void stop() {
        stop(COMPLETED_PROMISE);
    }

    @Override // io.scalecube.transport.Transport
    public final void stop(CompletableFuture<Void> completableFuture) {
        Preconditions.checkState(!this.stopped, "Transport is stopped");
        Preconditions.checkArgument(completableFuture != null);
        this.stopped = true;
        try {
            this.messageSink.complete();
        } catch (Exception e) {
        }
        Iterator<Address> it = this.outgoingChannels.keySet().iterator();
        while (it.hasNext()) {
            ChannelFuture channelFuture = this.outgoingChannels.get(it.next());
            if (channelFuture != null) {
                if (channelFuture.isSuccess()) {
                    channelFuture.channel().close();
                } else {
                    channelFuture.addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE);
                }
            }
        }
        this.outgoingChannels.clear();
        if (this.serverChannel != null) {
            composeFutures(this.serverChannel.close(), completableFuture);
        }
        this.bootstrapFactory.shutdown();
    }

    @Override // io.scalecube.transport.Transport
    public final Flux<Message> listen() {
        Preconditions.checkState(!this.stopped, "Transport is stopped");
        return this.incomingMessagesSubject.onBackpressureBuffer();
    }

    @Override // io.scalecube.transport.Transport
    public void send(Address address, Message message) {
        send(address, message, COMPLETED_PROMISE);
    }

    @Override // io.scalecube.transport.Transport
    public void send(Address address, Message message, CompletableFuture<Void> completableFuture) {
        Preconditions.checkState(!this.stopped, "Transport is stopped");
        Preconditions.checkArgument(address != null);
        Preconditions.checkArgument(message != null);
        Preconditions.checkArgument(completableFuture != null);
        message.setSender(this.address);
        ChannelFuture computeIfAbsent = this.outgoingChannels.computeIfAbsent(address, this::connect);
        if (computeIfAbsent.isSuccess()) {
            send(computeIfAbsent.channel(), message, completableFuture);
        } else {
            computeIfAbsent.addListener2(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    send(computeIfAbsent.channel(), message, (CompletableFuture<Void>) completableFuture);
                } else {
                    completableFuture.completeExceptionally(channelFuture.cause());
                }
            });
        }
    }

    private void send(Channel channel, Message message, CompletableFuture<Void> completableFuture) {
        if (completableFuture.equals(COMPLETED_PROMISE)) {
            channel.writeAndFlush(message, channel.voidPromise());
        } else {
            composeFutures(channel.writeAndFlush(message), completableFuture);
        }
    }

    private void composeFutures(ChannelFuture channelFuture, CompletableFuture<Void> completableFuture) {
        channelFuture.addListener2(channelFuture2 -> {
            if (channelFuture.isSuccess()) {
                completableFuture.complete(channelFuture.get());
            } else {
                completableFuture.completeExceptionally(channelFuture.cause());
            }
        });
    }

    private ChannelFuture connect(Address address) {
        ChannelFuture connect = this.bootstrapFactory.clientBootstrap().handler(new OutgoingChannelInitializer(address)).connect(address.host(), address.port());
        connect.addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
            if (channelFuture.isSuccess()) {
                LOGGER.debug("Connected from {} to {}: {}", this.address, address, channelFuture.channel());
            } else {
                LOGGER.warn("Failed to connect from {} to {}", this.address, address);
                this.outgoingChannels.remove(address);
            }
        });
        return connect;
    }
}
