package io.scalecube.transport;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/scalecube/transport/TransportImpl.class */
public final class TransportImpl implements Transport {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransportImpl.class);
    private final TransportConfig config;
    private final BootstrapFactory bootstrapFactory;
    private NetworkEmulator networkEmulator;
    private NetworkEmulatorHandler networkEmulatorHandler;
    private Address address;
    private ServerChannel serverChannel;
    private final FluxProcessor<Message, Message> incomingMessagesSubject = DirectProcessor.create().serialize();
    private final FluxSink<Message> messageSink = this.incomingMessagesSubject.sink();
    private final Map<Address, Mono<Channel>> outgoingChannels = new ConcurrentHashMap();
    private final IncomingChannelInitializer incomingChannelInitializer = new IncomingChannelInitializer();
    private final ExceptionHandler exceptionHandler = new ExceptionHandler();
    private volatile boolean stopped = false;
    private final MessageToByteEncoder<Message> serializerHandler = new MessageSerializerHandler();
    private final MessageToMessageDecoder<ByteBuf> deserializerHandler = new MessageDeserializerHandler();
    private final MessageHandler messageHandler = new MessageHandler(this.messageSink);

    @ChannelHandler.Sharable
    /* loaded from: input_file:io/scalecube/transport/TransportImpl$IncomingChannelInitializer.class */
    private final class IncomingChannelInitializer extends ChannelInitializer {
        private IncomingChannelInitializer() {
        }

        protected void initChannel(Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder()});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.deserializerHandler});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.messageHandler});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }

    @ChannelHandler.Sharable
    /* loaded from: input_file:io/scalecube/transport/TransportImpl$OutgoingChannelInitializer.class */
    private final class OutgoingChannelInitializer extends ChannelInitializer {
        private final Address address;

        public OutgoingChannelInitializer(Address address) {
            this.address = address;
        }

        protected void initChannel(Channel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new ChannelDuplexHandler() { // from class: io.scalecube.transport.TransportImpl.OutgoingChannelInitializer.1
                public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    TransportImpl.LOGGER.debug("Disconnected from: {} {}", OutgoingChannelInitializer.this.address, channelHandlerContext.channel());
                    TransportImpl.this.outgoingChannels.remove(OutgoingChannelInitializer.this.address);
                    super.channelInactive(channelHandlerContext);
                }
            }});
            pipeline.addLast(new ChannelHandler[]{new ProtobufVarint32LengthFieldPrepender()});
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.serializerHandler});
            if (TransportImpl.this.networkEmulatorHandler != null) {
                pipeline.addLast(new ChannelHandler[]{TransportImpl.this.networkEmulatorHandler});
            }
            pipeline.addLast(new ChannelHandler[]{TransportImpl.this.exceptionHandler});
        }
    }

    public TransportImpl(TransportConfig transportConfig) {
        this.config = (TransportConfig) Objects.requireNonNull(transportConfig);
        this.bootstrapFactory = new BootstrapFactory(transportConfig);
    }

    public Mono<Transport> bind0() {
        return Mono.defer(() -> {
            return bind0(this.config.getPort());
        });
    }

    private Mono<Transport> bind0(int i) {
        return Mono.defer(() -> {
            return toMono(this.bootstrapFactory.serverBootstrap().childHandler(this.incomingChannelInitializer).bind(i)).doOnSuccess(channel -> {
                this.serverChannel = (ServerChannel) channel;
                this.address = toAddress(this.serverChannel.localAddress());
                this.networkEmulator = new NetworkEmulator(this.address, this.config.isUseNetworkEmulator());
                this.networkEmulatorHandler = this.config.isUseNetworkEmulator() ? new NetworkEmulatorHandler(this.networkEmulator) : null;
                LOGGER.info("Bound cluster transport on: {}", this.address);
            }).doOnError(th -> {
                LOGGER.error("Failed to bind cluster transport on port={}, cause: {}", Integer.valueOf(i), th);
            }).thenReturn(this);
        });
    }

    @Override // io.scalecube.transport.Transport
    public Address address() {
        return this.address;
    }

    @Override // io.scalecube.transport.Transport
    public boolean isStopped() {
        return this.stopped;
    }

    @Override // io.scalecube.transport.Transport
    public NetworkEmulator networkEmulator() {
        return this.networkEmulator;
    }

    @Override // io.scalecube.transport.Transport
    public final Mono<Void> stop() {
        return Mono.defer(() -> {
            if (this.stopped) {
                throw new IllegalStateException("Transport is stopped");
            }
            this.stopped = true;
            try {
                this.messageSink.complete();
            } catch (Exception e) {
            }
            ArrayList arrayList = new ArrayList();
            Optional map = Optional.ofNullable(this.serverChannel).map((v0) -> {
                return v0.close();
            }).map(TransportImpl::toMono).map((v0) -> {
                return v0.then();
            });
            arrayList.getClass();
            map.ifPresent((v1) -> {
                r1.add(v1);
            });
            Iterator<Address> it = this.outgoingChannels.keySet().iterator();
            while (it.hasNext()) {
                Optional.ofNullable(this.outgoingChannels.get(it.next())).ifPresent(mono -> {
                    Mono map2 = mono.map((v0) -> {
                        return v0.close();
                    }).map(TransportImpl::toMono).map((v0) -> {
                        return v0.then();
                    });
                    arrayList.getClass();
                    map2.subscribe((v1) -> {
                        r1.add(v1);
                    });
                });
            }
            this.outgoingChannels.clear();
            this.bootstrapFactory.shutdown();
            return Mono.when(arrayList);
        });
    }

    @Override // io.scalecube.transport.Transport
    public final Flux<Message> listen() {
        return this.incomingMessagesSubject.onBackpressureBuffer();
    }

    @Override // io.scalecube.transport.Transport
    public Mono<Void> send(Address address, Message message) {
        return Mono.defer(() -> {
            if (this.stopped) {
                throw new IllegalStateException("Transport is stopped");
            }
            Objects.requireNonNull(address);
            Objects.requireNonNull(message);
            message.setSender(this.address);
            return getOrConnect(address).flatMap(channel -> {
                return toMono(channel.writeAndFlush(message)).then();
            }).doOnError(th -> {
                LOGGER.debug("Failed to send {} from {} to {}, cause: {}", new Object[]{message, this.address, address, th});
            });
        });
    }

    private Mono<Channel> getOrConnect(Address address) {
        return Mono.create(monoSink -> {
            Mono<Channel> computeIfAbsent = this.outgoingChannels.computeIfAbsent(address, this::connect0);
            monoSink.getClass();
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            monoSink.getClass();
            computeIfAbsent.subscribe(consumer, monoSink::error);
        });
    }

    private Mono<Channel> connect0(Address address) {
        return connect1(address).doOnSuccess(channel -> {
            LOGGER.debug("Connected from {} to {}: {}", new Object[]{this.address, address, channel});
        }).doOnError(th -> {
            this.outgoingChannels.remove(address);
        }).cache();
    }

    private Mono<Channel> connect1(Address address) {
        return Mono.create(monoSink -> {
            this.bootstrapFactory.clientBootstrap().handler(new OutgoingChannelInitializer(address)).connect(address.host(), address.port()).addListener(future -> {
                Mono<Channel> mono = toMono((ChannelFuture) future);
                monoSink.getClass();
                Consumer consumer = (v1) -> {
                    r1.success(v1);
                };
                monoSink.getClass();
                Consumer consumer2 = monoSink::error;
                monoSink.getClass();
                mono.subscribe(consumer, consumer2, monoSink::success);
            });
        });
    }

    private static Address toAddress(SocketAddress socketAddress) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        return Address.create(inetSocketAddress.getHostString(), inetSocketAddress.getPort());
    }

    private static Mono<Channel> toMono(ChannelFuture channelFuture) {
        return Mono.create(monoSink -> {
            channelFuture.addListener(channelFuture2 -> {
                toMono0(monoSink, channelFuture2);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void toMono0(MonoSink<Channel> monoSink, ChannelFuture channelFuture) {
        if (channelFuture.isSuccess()) {
            monoSink.success(channelFuture.channel());
        } else {
            monoSink.error(channelFuture.cause());
        }
    }
}
