package org.apache.hadoop.ozone.container.replication;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/container/replication/ReplicationServer.class */
public class ReplicationServer {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationServer.class);
    private Server server;
    private SecurityConfig secConf;
    private CertificateClient caClient;
    private ContainerController controller;
    private int port;
    private final ContainerImporter importer;
    private ThreadPoolExecutor executor;

    @ConfigGroup(prefix = ReplicationConfig.PREFIX)
    /* loaded from: input_file:org/apache/hadoop/ozone/container/replication/ReplicationServer$ReplicationConfig.class */
    public static final class ReplicationConfig {
        public static final String PREFIX = "hdds.datanode.replication";
        public static final String STREAMS_LIMIT_KEY = "streams.limit";
        public static final String QUEUE_LIMIT = "queue.limit";
        public static final String REPLICATION_STREAMS_LIMIT_KEY = "hdds.datanode.replication.streams.limit";
        public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
        private static final String OUTOFSERVICE_FACTOR_KEY = "outofservice.limit.factor";
        private static final double OUTOFSERVICE_FACTOR_MIN = 1.0d;
        static final double OUTOFSERVICE_FACTOR_DEFAULT = 2.0d;
        private static final String OUTOFSERVICE_FACTOR_DEFAULT_VALUE = "2.0";
        private static final double OUTOFSERVICE_FACTOR_MAX = 10.0d;
        static final String REPLICATION_OUTOFSERVICE_FACTOR_KEY = "hdds.datanode.replication.outofservice.limit.factor";

        @Config(key = "port", defaultValue = "9886", description = "Port used for the server2server replication server", tags = {ConfigTag.DATANODE, ConfigTag.MANAGEMENT})
        private int port;

        @Config(key = STREAMS_LIMIT_KEY, type = ConfigType.INT, defaultValue = "10", tags = {ConfigTag.DATANODE}, description = "The maximum number of replication commands a single datanode can execute simultaneously")
        private int replicationMaxStreams = 10;

        @Config(key = QUEUE_LIMIT, type = ConfigType.INT, defaultValue = "4096", tags = {ConfigTag.DATANODE}, description = "The maximum number of queued requests for container replication")
        private int replicationQueueLimit = 4096;

        @Config(key = OUTOFSERVICE_FACTOR_KEY, type = ConfigType.DOUBLE, defaultValue = OUTOFSERVICE_FACTOR_DEFAULT_VALUE, tags = {ConfigTag.DATANODE, ConfigTag.SCM}, description = "Decommissioning and maintenance nodes can handle morereplication commands than in-service nodes due to reduced load. This multiplier determines the increased queue capacity and executor pool size.")
        private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;

        public double getOutOfServiceFactor() {
            return this.outOfServiceFactor;
        }

        public int scaleOutOfServiceLimit(int i) {
            return (int) Math.ceil(i * this.outOfServiceFactor);
        }

        public int getPort() {
            return this.port;
        }

        public ReplicationConfig setPort(int i) {
            this.port = i;
            return this;
        }

        public int getReplicationMaxStreams() {
            return this.replicationMaxStreams;
        }

        public void setReplicationMaxStreams(int i) {
            this.replicationMaxStreams = i;
        }

        public int getReplicationQueueLimit() {
            return this.replicationQueueLimit;
        }

        public void setReplicationQueueLimit(int i) {
            this.replicationQueueLimit = i;
        }

        @PostConstruct
        public void validate() {
            if (this.replicationMaxStreams < 1) {
                ReplicationServer.LOG.warn("hdds.datanode.replication.streams.limit must be greater than zero and was set to {}. Defaulting to {}", Integer.valueOf(this.replicationMaxStreams), 10);
                this.replicationMaxStreams = 10;
            }
            if (this.outOfServiceFactor < OUTOFSERVICE_FACTOR_MIN || this.outOfServiceFactor > OUTOFSERVICE_FACTOR_MAX) {
                ReplicationServer.LOG.warn("{} must be between {} and {} but was set to {}. Defaulting to {}", new Object[]{REPLICATION_OUTOFSERVICE_FACTOR_KEY, Double.valueOf(OUTOFSERVICE_FACTOR_MIN), Double.valueOf(OUTOFSERVICE_FACTOR_MAX), Double.valueOf(this.outOfServiceFactor), Double.valueOf(OUTOFSERVICE_FACTOR_DEFAULT)});
                this.outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;
            }
        }
    }

    public ReplicationServer(ContainerController containerController, ReplicationConfig replicationConfig, SecurityConfig securityConfig, CertificateClient certificateClient, ContainerImporter containerImporter, String str) {
        this.secConf = securityConfig;
        this.caClient = certificateClient;
        this.controller = containerController;
        this.importer = containerImporter;
        this.port = replicationConfig.getPort();
        int replicationMaxStreams = replicationConfig.getReplicationMaxStreams();
        int replicationQueueLimit = replicationConfig.getReplicationQueueLimit();
        LOG.info("Initializing replication server with thread count = {} queue length = {}", Integer.valueOf(replicationConfig.getReplicationMaxStreams()), Integer.valueOf(replicationConfig.getReplicationQueueLimit()));
        this.executor = new ThreadPoolExecutor(replicationMaxStreams, replicationMaxStreams, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(replicationQueueLimit), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(str + "ReplicationContainerReader-%d").build());
        init();
    }

    public void init() {
        NettyServerBuilder executor = NettyServerBuilder.forPort(this.port).maxInboundMessageSize(33554432).addService(ServerInterceptors.intercept(new GrpcReplicationService(new OnDemandContainerReplicationSource(this.controller), this.importer), new ServerInterceptor[]{new GrpcServerInterceptor()})).executor(this.executor);
        if (this.secConf.isSecurityEnabled() && this.secConf.isGrpcTlsEnabled()) {
            try {
                SslContextBuilder configure = GrpcSslContexts.configure(SslContextBuilder.forServer(this.caClient.getServerKeyStoresFactory().getKeyManagers()[0]), this.secConf.getGrpcSslProvider());
                configure.clientAuth(ClientAuth.REQUIRE);
                configure.trustManager(this.caClient.getServerKeyStoresFactory().getTrustManagers()[0]);
                executor.sslContext(configure.build());
            } catch (IOException e) {
                throw new IllegalArgumentException("Unable to setup TLS for secure datanode replication GRPC endpoint.", e);
            }
        }
        this.server = executor.build();
    }

    public void start() throws IOException {
        this.server.start();
        this.port = this.server.getPort();
        LOG.info("{} is started using port {}", getClass().getSimpleName(), Integer.valueOf(this.port));
    }

    public void stop() {
        try {
            this.executor.shutdown();
            this.executor.awaitTermination(5L, TimeUnit.SECONDS);
            this.server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());
            Thread.currentThread().interrupt();
        }
    }

    public int getPort() {
        return this.port;
    }
}
