/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.transport;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.com.google.common.base.Strings;
import net.nmoncho.shaded.io.netty.bootstrap.ServerBootstrap;
import net.nmoncho.shaded.io.netty.buffer.ByteBuf;
import net.nmoncho.shaded.io.netty.channel.Channel;
import net.nmoncho.shaded.io.netty.channel.ChannelFuture;
import net.nmoncho.shaded.io.netty.channel.ChannelHandler;
import net.nmoncho.shaded.io.netty.channel.ChannelHandlerContext;
import net.nmoncho.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import net.nmoncho.shaded.io.netty.channel.ChannelInitializer;
import net.nmoncho.shaded.io.netty.channel.ChannelOption;
import net.nmoncho.shaded.io.netty.channel.ChannelPipeline;
import net.nmoncho.shaded.io.netty.channel.EventLoopGroup;
import net.nmoncho.shaded.io.netty.channel.WriteBufferWaterMark;
import net.nmoncho.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import net.nmoncho.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import net.nmoncho.shaded.io.netty.handler.codec.ByteToMessageDecoder;
import net.nmoncho.shaded.io.netty.handler.logging.LogLevel;
import net.nmoncho.shaded.io.netty.handler.logging.LoggingHandler;
import net.nmoncho.shaded.io.netty.handler.ssl.SslContext;
import net.nmoncho.shaded.io.netty.handler.ssl.SslHandler;
import net.nmoncho.shaded.io.netty.handler.timeout.IdleStateEvent;
import net.nmoncho.shaded.io.netty.handler.timeout.IdleStateHandler;
import net.nmoncho.shaded.io.netty.util.Version;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.BufferPoolAllocator;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.FrameDecoderCrc;
import org.apache.cassandra.net.FrameDecoderLZ4;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.net.FrameEncoderCrc;
import org.apache.cassandra.net.FrameEncoderLZ4;
import org.apache.cassandra.net.GlobalBufferPoolAllocator;
import org.apache.cassandra.security.ISslContextFactory;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.transport.CQLMessageHandler;
import org.apache.cassandra.transport.ClientResourceLimits;
import org.apache.cassandra.transport.Connection;
import org.apache.cassandra.transport.ConnectionLimitHandler;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.transport.Envelope;
import org.apache.cassandra.transport.ExceptionHandlers;
import org.apache.cassandra.transport.InitialConnectionHandler;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.PreV5Handlers;
import org.apache.cassandra.transport.ProtocolException;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.QueueBackpressure;
import org.apache.cassandra.transport.ServerConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineConfigurator {
    private static final Logger logger = LoggerFactory.getLogger(PipelineConfigurator.class);
    private static final boolean DEBUG = Boolean.getBoolean("cassandra.unsafe_verbose_debug_client_protocol");
    public static final String SSL_FACTORY_CONTEXT_DESCRIPTION = "client_encryption_options";
    private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
    private static final String CONNECTION_LIMIT_HANDLER = "connectionLimitHandler";
    private static final String IDLE_STATE_HANDLER = "idleStateHandler";
    private static final String INITIAL_HANDLER = "initialHandler";
    private static final String EXCEPTION_HANDLER = "exceptionHandler";
    private static final String DEBUG_HANDLER = "debugHandler";
    private static final String SSL_HANDLER = "ssl";
    private static final String ENVELOPE_DECODER = "envelopeDecoder";
    private static final String ENVELOPE_ENCODER = "envelopeEncoder";
    private static final String MESSAGE_DECOMPRESSOR = "decompressor";
    private static final String MESSAGE_COMPRESSOR = "compressor";
    private static final String MESSAGE_DECODER = "messageDecoder";
    private static final String MESSAGE_ENCODER = "messageEncoder";
    private static final String LEGACY_MESSAGE_PROCESSOR = "legacyCqlProcessor";
    private static final String FRAME_DECODER = "frameDecoder";
    private static final String FRAME_ENCODER = "frameEncoder";
    private static final String MESSAGE_PROCESSOR = "cqlProcessor";
    private final boolean epoll;
    private final boolean keepAlive;
    private final EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy;
    private final Dispatcher dispatcher;
    private final QueueBackpressure queueBackpressure;

    public PipelineConfigurator(boolean epoll, boolean keepAlive, EncryptionOptions.TlsEncryptionPolicy encryptionPolicy, Dispatcher dispatcher) {
        this.epoll = epoll;
        this.keepAlive = keepAlive;
        this.tlsEncryptionPolicy = encryptionPolicy;
        this.dispatcher = dispatcher;
        this.queueBackpressure = QueueBackpressure.DEFAULT;
    }

    @Deprecated
    @VisibleForTesting
    public PipelineConfigurator(boolean epoll, boolean keepAlive, boolean useLegacyFlusher, EncryptionOptions.TlsEncryptionPolicy encryptionPolicy) {
        this.epoll = epoll;
        this.keepAlive = keepAlive;
        this.tlsEncryptionPolicy = encryptionPolicy;
        this.dispatcher = new Dispatcher(useLegacyFlusher);
        this.queueBackpressure = QueueBackpressure.DEFAULT;
    }

    public ChannelFuture initializeChannel(EventLoopGroup workerGroup, InetSocketAddress socket, Connection.Factory connectionFactory) {
        ServerBootstrap bootstrap = ((ServerBootstrap)new ServerBootstrap().channel(this.epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)).childOption(ChannelOption.TCP_NODELAY, (Object)true).childOption(ChannelOption.SO_LINGER, (Object)0).childOption(ChannelOption.SO_KEEPALIVE, (Object)this.keepAlive).childOption(ChannelOption.ALLOCATOR, (Object)CBUtil.allocator).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(8192, 32768));
        if (workerGroup != null) {
            bootstrap = bootstrap.group(workerGroup);
        }
        ChannelInitializer<Channel> initializer = this.initializer(connectionFactory);
        bootstrap.childHandler(initializer);
        logger.info("Using Netty Version: {}", Version.identify().entrySet());
        logger.info("Starting listening for CQL clients on {} ({})...", (Object)socket, (Object)this.tlsEncryptionPolicy.description());
        return bootstrap.bind((SocketAddress)socket);
    }

    protected ChannelInitializer<Channel> initializer(final Connection.Factory connectionFactory) {
        final EncryptionConfig encryptionConfig = this.encryptionConfig();
        return new ChannelInitializer<Channel>(){

            protected void initChannel(Channel channel) throws Exception {
                PipelineConfigurator.this.configureInitialPipeline(channel, connectionFactory);
                encryptionConfig.applyTo(channel);
            }
        };
    }

    protected EncryptionConfig encryptionConfig() {
        EncryptionOptions encryptionOptions = DatabaseDescriptor.getNativeProtocolEncryptionOptions();
        switch (this.tlsEncryptionPolicy) {
            case UNENCRYPTED: {
                return channel -> {};
            }
            case OPTIONAL: {
                logger.debug("Enabling optionally encrypted CQL connections between client and server");
                return channel -> {
                    final SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions, encryptionOptions.require_client_auth, ISslContextFactory.SocketType.SERVER, SSL_FACTORY_CONTEXT_DESCRIPTION);
                    channel.pipeline().addFirst(SSL_HANDLER, (ChannelHandler)new ByteToMessageDecoder(){

                        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
                            if (byteBuf.readableBytes() < 5) {
                                return;
                            }
                            if (SslHandler.isEncrypted((ByteBuf)byteBuf)) {
                                SslHandler sslHandler = sslContext.newHandler(channel.alloc());
                                channelHandlerContext.pipeline().replace(PipelineConfigurator.SSL_HANDLER, PipelineConfigurator.SSL_HANDLER, (ChannelHandler)sslHandler);
                            } else {
                                channelHandlerContext.pipeline().remove(PipelineConfigurator.SSL_HANDLER);
                            }
                        }
                    });
                };
            }
            case ENCRYPTED: {
                logger.debug("Enabling encrypted CQL connections between client and server");
                return channel -> {
                    SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions, encryptionOptions.require_client_auth, ISslContextFactory.SocketType.SERVER, SSL_FACTORY_CONTEXT_DESCRIPTION);
                    channel.pipeline().addFirst(SSL_HANDLER, (ChannelHandler)sslContext.newHandler(channel.alloc()));
                };
            }
        }
        throw new IllegalStateException("Unrecognized TLS encryption policy: " + (Object)((Object)this.tlsEncryptionPolicy));
    }

    public void configureInitialPipeline(final Channel channel, Connection.Factory connectionFactory) {
        long idleTimeout;
        ChannelPipeline pipeline = channel.pipeline();
        if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0L || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0L) {
            pipeline.addFirst(CONNECTION_LIMIT_HANDLER, (ChannelHandler)connectionLimitHandler);
        }
        if ((idleTimeout = DatabaseDescriptor.nativeTransportIdleTimeout()) > 0L) {
            pipeline.addLast(IDLE_STATE_HANDLER, (ChannelHandler)new IdleStateHandler(false, 0L, 0L, idleTimeout, TimeUnit.MILLISECONDS){

                protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
                    logger.info("Closing client connection {} after timeout of {}ms", (Object)channel.remoteAddress(), (Object)idleTimeout);
                    ctx.close();
                }
            });
        }
        if (DEBUG) {
            pipeline.addLast(DEBUG_HANDLER, (ChannelHandler)new LoggingHandler(LogLevel.INFO));
        }
        pipeline.addLast(ENVELOPE_ENCODER, (ChannelHandler)Envelope.Encoder.instance);
        pipeline.addLast(INITIAL_HANDLER, (ChannelHandler)new InitialConnectionHandler(new Envelope.Decoder(), connectionFactory, this));
        pipeline.addLast(EXCEPTION_HANDLER, (ChannelHandler)PreV5Handlers.ExceptionHandler.instance);
        this.onInitialPipelineReady(pipeline);
    }

    public void configureModernPipeline(ChannelHandlerContext ctx, ServerConnection serverConnection, ClientResourceLimits.Allocator resourceAllocator, ProtocolVersion version, Map<String, String> options) {
        GlobalBufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
        ctx.channel().config().setOption(ChannelOption.ALLOCATOR, (Object)allocator);
        String compression = options.get("COMPRESSION");
        FrameDecoder frameDecoder = this.frameDecoder(compression, allocator);
        FrameEncoder frameEncoder = this.frameEncoder(compression);
        FrameEncoder.PayloadAllocator payloadAllocator = frameEncoder.allocator();
        ChannelInboundHandlerAdapter exceptionHandler = ExceptionHandlers.postV5Handler(payloadAllocator, version);
        Message.Decoder<Message.Request> messageDecoder = this.messageDecoder();
        Envelope.Decoder envelopeDecoder = new Envelope.Decoder();
        ChannelPipeline pipeline = ctx.channel().pipeline();
        ChannelHandlerContext firstContext = pipeline.firstContext();
        CQLMessageHandler.ErrorHandler errorHandler = arg_0 -> ((ChannelHandlerContext)firstContext).fireExceptionCaught(arg_0);
        int queueCapacity = DatabaseDescriptor.getNativeTransportReceiveQueueCapacityInBytes();
        ClientResourceLimits.ResourceProvider resourceProvider = this.resourceProvider(resourceAllocator);
        AbstractMessageHandler.OnHandlerClosed onClosed = handler -> resourceProvider.release();
        String fromOptions = options.get("THROW_ON_OVERLOAD");
        boolean throwOnOverload = fromOptions == null ? DatabaseDescriptor.getNativeTransportThrowOnOverload() : "1".equals(fromOptions);
        CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer = this.messageConsumer();
        CQLMessageHandler<Message.Request> processor = new CQLMessageHandler<Message.Request>(ctx.channel(), serverConnection, version, frameDecoder, envelopeDecoder, messageDecoder, messageConsumer, payloadAllocator, queueCapacity, this.queueBackpressure, resourceProvider, onClosed, errorHandler, throwOnOverload);
        pipeline.remove(ENVELOPE_ENCODER);
        pipeline.addBefore(INITIAL_HANDLER, FRAME_DECODER, (ChannelHandler)frameDecoder);
        pipeline.addBefore(INITIAL_HANDLER, FRAME_ENCODER, (ChannelHandler)frameEncoder);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, processor);
        pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, (ChannelHandler)exceptionHandler);
        pipeline.remove(INITIAL_HANDLER);
        ctx.channel().attr(Dispatcher.EVENT_DISPATCHER).set(this.dispatcher.eventDispatcher(ctx.channel(), version, payloadAllocator));
        this.onNegotiationComplete(pipeline);
    }

    protected void onInitialPipelineReady(ChannelPipeline pipeline) {
    }

    protected void onNegotiationComplete(ChannelPipeline pipeline) {
    }

    protected ClientResourceLimits.ResourceProvider resourceProvider(ClientResourceLimits.Allocator allocator) {
        return new ClientResourceLimits.ResourceProvider.Default(allocator);
    }

    protected Dispatcher dispatcher(boolean useLegacyFlusher) {
        return new Dispatcher(useLegacyFlusher);
    }

    protected CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer() {
        return this.dispatcher;
    }

    protected Message.Decoder<Message.Request> messageDecoder() {
        return Message.requestDecoder();
    }

    protected FrameDecoder frameDecoder(String compression, BufferPoolAllocator allocator) {
        if (null == compression) {
            return FrameDecoderCrc.create(allocator);
        }
        if (compression.equalsIgnoreCase("LZ4")) {
            return FrameDecoderLZ4.fast(allocator);
        }
        throw new ProtocolException("Unsupported compression type: " + compression);
    }

    protected FrameEncoder frameEncoder(String compression) {
        if (Strings.isNullOrEmpty(compression)) {
            return FrameEncoderCrc.instance;
        }
        if (compression.equalsIgnoreCase("LZ4")) {
            return FrameEncoderLZ4.fastInstance;
        }
        throw new ProtocolException("Unsupported compression type: " + compression);
    }

    public void configureLegacyPipeline(ChannelHandlerContext ctx, ClientResourceLimits.Allocator limits) {
        ChannelPipeline pipeline = ctx.channel().pipeline();
        pipeline.addBefore(ENVELOPE_ENCODER, ENVELOPE_DECODER, (ChannelHandler)new Envelope.Decoder());
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_DECOMPRESSOR, (ChannelHandler)Envelope.Decompressor.instance);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_COMPRESSOR, (ChannelHandler)Envelope.Compressor.instance);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_DECODER, (ChannelHandler)PreV5Handlers.ProtocolDecoder.instance);
        pipeline.addBefore(INITIAL_HANDLER, MESSAGE_ENCODER, (ChannelHandler)PreV5Handlers.ProtocolEncoder.instance);
        pipeline.addBefore(INITIAL_HANDLER, LEGACY_MESSAGE_PROCESSOR, (ChannelHandler)new PreV5Handlers.LegacyDispatchHandler(this.dispatcher, this.queueBackpressure, limits));
        pipeline.remove(INITIAL_HANDLER);
        this.onNegotiationComplete(pipeline);
    }

    static interface EncryptionConfig {
        public void applyTo(Channel var1) throws Exception;
    }
}

