package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/proxy/server/ProxyConnection.class */
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
    private ProxyService service;
    String clientAuthRole;
    private State state;
    private LookupProxyHandler lookupProxyHandler;
    private DirectProxyHandler directProxyHandler;
    private static final Gauge activeConnections = Gauge.build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create().register();
    private static final Counter newConnections = Counter.build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create().register();
    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/proxy/server/ProxyConnection$State.class */
    public enum State {
        Init,
        ProxyLookupRequests,
        ProxyConnectionToBroker,
        Closed
    }

    public ProxyConnection(ProxyService proxyService) {
        super(30, TimeUnit.SECONDS);
        this.clientAuthRole = null;
        this.lookupProxyHandler = null;
        this.directProxyHandler = null;
        this.service = proxyService;
        this.state = State.Init;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        activeConnections.inc();
        newConnections.inc();
        LOG.info("[{}] New connection opened", this.remoteAddress);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        activeConnections.dec();
        if (this.directProxyHandler == null || this.directProxyHandler.outboundChannel == null) {
            return;
        }
        this.directProxyHandler.outboundChannel.close();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        switch (this.state) {
            case Init:
            case ProxyLookupRequests:
                super.channelRead(channelHandlerContext, obj);
                return;
            case ProxyConnectionToBroker:
                this.directProxyHandler.outboundChannel.writeAndFlush(obj).addListener(this);
                return;
            default:
                return;
        }
    }

    public void operationComplete(Future<Void> future) throws Exception {
        if (future.isSuccess()) {
            this.ctx.read();
        } else {
            LOG.warn("[{}] Error in writing to inbound channel. Closing", this.remoteAddress, future.cause());
            this.directProxyHandler.outboundChannel.close();
        }
    }

    protected void handleConnect(PulsarApi.CommandConnect commandConnect) {
        Preconditions.checkArgument(this.state == State.Init);
        this.remoteEndpointProtocolVersion = commandConnect.getProtocolVersion();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received CONNECT from {} proxyToBroker={}", this.remoteAddress, commandConnect.hasProxyToBrokerUrl() ? commandConnect.getProxyToBrokerUrl() : "null");
        }
        if (this.remoteEndpointProtocolVersion < 10) {
            LOG.warn("[{}] Client doesn't support connecting through proxy", this.remoteAddress);
            this.ctx.close();
            return;
        }
        if (!verifyAuthenticationIfNeeded(commandConnect)) {
            this.ctx.writeAndFlush(Commands.newError(-1L, PulsarApi.ServerError.AuthenticationError, "Failed to authenticate"));
            close();
        } else if (commandConnect.hasProxyToBrokerUrl()) {
            this.state = State.ProxyConnectionToBroker;
            this.directProxyHandler = new DirectProxyHandler(this.service, this, commandConnect.getProxyToBrokerUrl());
        } else {
            this.state = State.ProxyLookupRequests;
            this.lookupProxyHandler = new LookupProxyHandler(this.service, this);
            this.ctx.writeAndFlush(Commands.newConnected(commandConnect.getProtocolVersion()));
        }
    }

    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata commandPartitionedTopicMetadata) {
        Preconditions.checkArgument(this.state == State.ProxyLookupRequests);
        this.lookupProxyHandler.handlePartitionMetadataResponse(commandPartitionedTopicMetadata);
    }

    protected void handleLookup(PulsarApi.CommandLookupTopic commandLookupTopic) {
        Preconditions.checkArgument(this.state == State.ProxyLookupRequests);
        this.lookupProxyHandler.handleLookup(commandLookupTopic);
    }

    private void close() {
        this.state = State.Closed;
        this.ctx.close();
    }

    private boolean verifyAuthenticationIfNeeded(PulsarApi.CommandConnect commandConnect) {
        if (!this.service.getConfiguration().isAuthenticationEnabled()) {
            return true;
        }
        try {
            String str = "none";
            if (commandConnect.hasAuthMethodName()) {
                str = commandConnect.getAuthMethodName();
            } else if (commandConnect.hasAuthMethod()) {
                str = commandConnect.getAuthMethod().name().substring(10).toLowerCase();
            }
            String stringUtf8 = commandConnect.getAuthData().toStringUtf8();
            SslHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sSLSession = null;
            if (sslHandler != null) {
                sSLSession = sslHandler.engine().getSession();
            }
            this.clientAuthRole = this.service.getAuthenticationService().authenticate(new AuthenticationDataCommand(stringUtf8, this.remoteAddress, sSLSession), str);
            LOG.info("[{}] Client successfully authenticated with {} role {}", new Object[]{this.remoteAddress, str, this.clientAuthRole});
            return true;
        } catch (AuthenticationException e) {
            LOG.warn("[{}] Unable to authenticate: {}", this.remoteAddress, e.getMessage());
            return false;
        }
    }

    protected boolean isHandshakeCompleted() {
        return this.state != State.Init;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelHandlerContext ctx() {
        return this.ctx;
    }
}
