package org.apache.pulsar.broker.service;

import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Cache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.io.netty.channel.ChannelInitializer;
import org.apache.pulsar.shade.io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.shade.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.pulsar.shade.io.netty.handler.flow.FlowControlHandler;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.NettySslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/PulsarChannelInitializer.class */
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
    private static final Logger log = LoggerFactory.getLogger(PulsarChannelInitializer.class);
    public static final String TLS_HANDLER = "tls";
    private final PulsarService pulsar;
    private final boolean enableTls;
    private final NettySslContextBuilder sslCtxRefresher;
    private final ServiceConfiguration brokerConf;
    private final Cache<SocketAddress, ServerCnx> connections = Caffeine.newBuilder().weakKeys().weakValues().build();

    public PulsarChannelInitializer(PulsarService pulsarService, boolean z) throws Exception {
        this.pulsar = pulsarService;
        this.enableTls = z;
        if (this.enableTls) {
            ServiceConfiguration configuration = pulsarService.getConfiguration();
            this.sslCtxRefresher = new NettySslContextBuilder(configuration.isTlsAllowInsecureConnection(), configuration.getTlsTrustCertsFilePath(), configuration.getTlsCertificateFilePath(), configuration.getTlsKeyFilePath(), configuration.getTlsCiphers(), configuration.getTlsProtocols(), configuration.isTlsRequireTrustedClientCertOnConnect(), configuration.getTlsCertRefreshCheckDurationSec());
        } else {
            this.sslCtxRefresher = null;
        }
        this.brokerConf = pulsarService.getConfiguration();
        pulsarService.getExecutor().scheduleAtFixedRate(SafeRunnable.safeRun(this::refreshAuthenticationCredentials), pulsarService.getConfig().getAuthenticationRefreshCheckSeconds(), pulsarService.getConfig().getAuthenticationRefreshCheckSeconds(), TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.shade.io.netty.channel.ChannelInitializer
    public void initChannel(SocketChannel socketChannel) throws Exception {
        if (this.enableTls) {
            socketChannel.pipeline().addLast("tls", this.sslCtxRefresher.get().newHandler(socketChannel.alloc()));
            socketChannel.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
        } else {
            socketChannel.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
        }
        socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(this.brokerConf.getMaxMessageSize() + 10240, 0, 4, 0, 4));
        socketChannel.pipeline().addLast("flowController", new FlowControlHandler());
        ServerCnx serverCnx = new ServerCnx(this.pulsar);
        socketChannel.pipeline().addLast("handler", serverCnx);
        this.connections.put(socketChannel.remoteAddress(), serverCnx);
    }

    private void refreshAuthenticationCredentials() {
        this.connections.asMap().values().forEach(serverCnx -> {
            try {
                serverCnx.refreshAuthenticationCredentials();
            } catch (Throwable th) {
                log.warn("[{}] Failed to refresh auth credentials", serverCnx.getRemoteAddress());
            }
        });
    }
}
