/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.haproxy.HAProxyCommand;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.tls.TlsHostnameVerifier;
import org.apache.pulsar.proxy.server.ParserProxyHandler;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectProxyHandler {
    private final Channel inboundChannel;
    Channel outboundChannel;
    private final Rate inboundChannelRequestsRate;
    protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<ChannelId, ChannelId>();
    private final String originalPrincipal;
    private final AuthData clientAuthData;
    private final String clientAuthMethod;
    public static final String TLS_HANDLER = "tls";
    private final Authentication authentication;
    private AuthenticationDataProvider authenticationDataProvider;
    private final ProxyService service;
    private final Runnable onHandshakeCompleteAction;
    static final byte[] TEXT_PREFIX = new byte[]{80, 82, 79, 88, 89};
    private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);

    public DirectProxyHandler(final ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, InetSocketAddress targetBrokerAddress, final int protocolVersion, final Supplier<SslHandler> sslHandlerSupplier) {
        URI targetBroker;
        this.service = service;
        this.authentication = proxyConnection.getClientAuthentication();
        this.inboundChannel = proxyConnection.ctx().channel();
        this.inboundChannelRequestsRate = new Rate();
        this.originalPrincipal = proxyConnection.clientAuthRole;
        this.clientAuthData = proxyConnection.clientAuthData;
        this.clientAuthMethod = proxyConnection.clientAuthMethod;
        this.onHandshakeCompleteAction = () -> ((ProxyConnection)proxyConnection).cancelKeepAliveTask();
        final ProxyConfiguration config = service.getConfiguration();
        Bootstrap b = new Bootstrap();
        b.option(ChannelOption.ALLOCATOR, (Object)PulsarByteBufAllocator.DEFAULT);
        int brokerProxyConnectTimeoutMs = service.getConfiguration().getBrokerProxyConnectTimeoutMs();
        if (brokerProxyConnectTimeoutMs > 0) {
            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)brokerProxyConnectTimeoutMs);
        }
        ((Bootstrap)((Bootstrap)b.group((EventLoopGroup)this.inboundChannel.eventLoop())).channel(this.inboundChannel.getClass())).option(ChannelOption.AUTO_READ, (Object)false);
        b.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) {
                int brokerProxyReadTimeoutMs;
                if (sslHandlerSupplier != null) {
                    ch.pipeline().addLast(DirectProxyHandler.TLS_HANDLER, (ChannelHandler)sslHandlerSupplier.get());
                }
                if ((brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs()) > 0) {
                    ch.pipeline().addLast("readTimeoutHandler", (ChannelHandler)new ReadTimeoutHandler((long)brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
                }
                ch.pipeline().addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(5253120, 0, 4, 0, 4));
                ch.pipeline().addLast("proxyOutboundHandler", (ChannelHandler)new ProxyBackendHandler(config, protocolVersion));
            }
        });
        try {
            targetBroker = new URI("pulsar://" + targetBrokerUrl);
        }
        catch (URISyntaxException e) {
            log.warn("[{}] Failed to parse broker url '{}'", new Object[]{this.inboundChannel, targetBrokerUrl, e});
            this.inboundChannel.close();
            return;
        }
        ChannelFuture f = b.connect((SocketAddress)targetBrokerAddress);
        this.outboundChannel = f.channel();
        f.addListener(future -> {
            if (!future.isSuccess()) {
                log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", new Object[]{this.inboundChannel, targetBrokerAddress, targetBrokerUrl, future.cause()});
                this.inboundChannel.close();
                return;
            }
            ProxyBackendHandler cnx = (ProxyBackendHandler)this.outboundChannel.pipeline().get("proxyOutboundHandler");
            cnx.setRemoteHostName(targetBroker.getHost());
            if (service.getProxyLogLevel() == 2) {
                inboundOutboundChannelMap.put(this.outboundChannel.id(), this.inboundChannel.id());
            }
            if (!config.isHaProxyProtocolEnabled()) {
                return;
            }
            if (proxyConnection.hasHAProxyMessage()) {
                this.outboundChannel.writeAndFlush((Object)this.encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
            } else {
                if (!(this.inboundChannel.remoteAddress() instanceof InetSocketAddress)) {
                    return;
                }
                if (!(this.outboundChannel.localAddress() instanceof InetSocketAddress)) {
                    return;
                }
                InetSocketAddress clientAddress = (InetSocketAddress)this.inboundChannel.remoteAddress();
                String sourceAddress = clientAddress.getAddress().getHostAddress();
                int sourcePort = clientAddress.getPort();
                InetSocketAddress proxyAddress = (InetSocketAddress)this.inboundChannel.remoteAddress();
                String destinationAddress = proxyAddress.getAddress().getHostAddress();
                int destinationPort = proxyAddress.getPort();
                HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
                this.outboundChannel.writeAndFlush((Object)this.encodeProxyProtocolMessage(msg));
                msg.release();
            }
        });
    }

    private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
        ByteBuf out = Unpooled.buffer((int)108);
        out.writeBytes(TEXT_PREFIX);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)msg.proxiedProtocol().name(), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)msg.sourceAddress(), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)msg.destinationAddress(), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)String.valueOf(msg.sourcePort()), CharsetUtil.US_ASCII);
        out.writeByte(32);
        out.writeCharSequence((CharSequence)String.valueOf(msg.destinationPort()), CharsetUtil.US_ASCII);
        out.writeByte(13);
        out.writeByte(10);
        return out;
    }

    public Channel getInboundChannel() {
        return this.inboundChannel;
    }

    public Channel getOutboundChannel() {
        return this.outboundChannel;
    }

    public Rate getInboundChannelRequestsRate() {
        return this.inboundChannelRequestsRate;
    }

    public class ProxyBackendHandler
    extends PulsarDecoder
    implements FutureListener<Void> {
        private BackendState state = BackendState.Init;
        private String remoteHostName;
        protected ChannelHandlerContext ctx;
        private final ProxyConfiguration config;
        private final int protocolVersion;

        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
            this.config = config;
            this.protocolVersion = protocolVersion;
        }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            this.ctx = ctx;
            DirectProxyHandler.this.authenticationDataProvider = DirectProxyHandler.this.authentication.getAuthData(this.remoteHostName);
            AuthData authData = DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
            ByteBuf command = Commands.newConnect((String)DirectProxyHandler.this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)"Pulsar proxy", null, (String)DirectProxyHandler.this.originalPrincipal, (AuthData)DirectProxyHandler.this.clientAuthData, (String)DirectProxyHandler.this.clientAuthMethod);
            DirectProxyHandler.this.outboundChannel.writeAndFlush((Object)command);
            DirectProxyHandler.this.outboundChannel.read();
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            switch (this.state) {
                case Init: {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Received msg on broker connection: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, msg.getClass()});
                    }
                    super.channelRead(ctx, msg);
                    break;
                }
                case HandshakeCompleted: {
                    ProxyService.OPS_COUNTER.inc();
                    if (msg instanceof ByteBuf) {
                        ProxyService.BYTES_COUNTER.inc((double)((ByteBuf)msg).readableBytes());
                    }
                    DirectProxyHandler.this.inboundChannel.writeAndFlush(msg).addListener((GenericFutureListener)this);
                    break;
                }
            }
        }

        protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
            Preconditions.checkArgument((boolean)authChallenge.hasChallenge());
            Preconditions.checkArgument((authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData() ? 1 : 0) != 0);
            if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
                try {
                    DirectProxyHandler.this.authenticationDataProvider = DirectProxyHandler.this.authentication.getAuthData(this.remoteHostName);
                }
                catch (PulsarClientException e) {
                    log.error("{} Error when refreshing authentication data provider: {}", (Object)this.ctx.channel(), (Object)e);
                    return;
                }
            }
            try {
                AuthData authData = DirectProxyHandler.this.authenticationDataProvider.authenticate(AuthData.of((byte[])authChallenge.getChallenge().getAuthData()));
                Preconditions.checkState((!authData.isComplete() ? 1 : 0) != 0);
                ByteBuf request = Commands.newAuthResponse((String)DirectProxyHandler.this.authentication.getAuthMethodName(), (AuthData)authData, (int)this.protocolVersion, (String)PulsarVersion.getVersion());
                if (log.isDebugEnabled()) {
                    log.debug("{} Mutual auth {}", (Object)this.ctx.channel(), (Object)DirectProxyHandler.this.authentication.getAuthMethodName());
                }
                DirectProxyHandler.this.outboundChannel.writeAndFlush((Object)request);
                DirectProxyHandler.this.outboundChannel.read();
            }
            catch (Exception e) {
                log.error("Error mutual verify", (Throwable)e);
            }
        }

        public void operationComplete(Future<Void> future) {
            if (future.isSuccess()) {
                DirectProxyHandler.this.outboundChannel.read();
            } else {
                log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, future.cause()});
                DirectProxyHandler.this.inboundChannel.close();
            }
        }

        protected void messageReceived() {
        }

        protected void handleConnected(CommandConnected connected) {
            Preconditions.checkArgument((this.state == BackendState.Init ? 1 : 0) != 0, (String)"Unexpected state %s. BackendState.Init was expected.", (Object)((Object)this.state));
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received Connected from broker", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
            }
            if (this.config.isTlsHostnameVerificationEnabled() && this.remoteHostName != null && !this.verifyTlsHostName(this.remoteHostName, this.ctx)) {
                log.warn("[{}] Failed to verify hostname of {}", (Object)this.ctx.channel(), (Object)this.remoteHostName);
                this.ctx.close();
                return;
            }
            this.state = BackendState.HandshakeCompleted;
            DirectProxyHandler.this.onHandshakeCompleteAction.run();
            this.startDirectProxying(connected);
            int maxMessageSize = connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : -1;
            DirectProxyHandler.this.inboundChannel.writeAndFlush((Object)Commands.newConnected((int)connected.getProtocolVersion(), (int)maxMessageSize)).addListener(future -> {
                if (future.isSuccess()) {
                    DirectProxyHandler.this.inboundChannel.read();
                    DirectProxyHandler.this.outboundChannel.read();
                } else {
                    log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, future.cause()});
                    DirectProxyHandler.this.inboundChannel.close();
                }
            });
        }

        private void startDirectProxying(CommandConnected connected) {
            if (DirectProxyHandler.this.service.getProxyLogLevel() == 0) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Removing decoder from pipeline", (Object)DirectProxyHandler.this.inboundChannel, (Object)DirectProxyHandler.this.outboundChannel);
                }
                DirectProxyHandler.this.inboundChannel.pipeline().remove("frameDecoder");
                DirectProxyHandler.this.outboundChannel.pipeline().remove("frameDecoder");
            } else if (connected.hasMaxMessageSize()) {
                DirectProxyHandler.this.inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                DirectProxyHandler.this.outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
                DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, DirectProxyHandler.this.inboundChannel, "frontendconn", connected.getMaxMessageSize()));
                DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, DirectProxyHandler.this.outboundChannel, "backendconn", connected.getMaxMessageSize()));
            } else {
                DirectProxyHandler.this.inboundChannel.pipeline().addBefore("handler", "inboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, DirectProxyHandler.this.inboundChannel, "frontendconn", 0x500000));
                DirectProxyHandler.this.outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", (ChannelHandler)new ParserProxyHandler(DirectProxyHandler.this.service, DirectProxyHandler.this.outboundChannel, "backendconn", 0x500000));
            }
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            DirectProxyHandler.this.inboundChannel.close();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.warn("[{}] [{}] Caught exception: {}", new Object[]{DirectProxyHandler.this.inboundChannel, DirectProxyHandler.this.outboundChannel, cause.getMessage(), cause});
            ctx.close();
        }

        public void setRemoteHostName(String remoteHostName) {
            this.remoteHostName = remoteHostName;
        }

        private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
            ChannelHandler sslHandler = ctx.channel().pipeline().get(DirectProxyHandler.TLS_HANDLER);
            if (sslHandler != null) {
                SSLSession sslSession = ((SslHandler)sslHandler).engine().getSession();
                return new TlsHostnameVerifier().verify(hostname, sslSession);
            }
            return false;
        }
    }

    static enum BackendState {
        Init,
        HandshakeCompleted;

    }
}

