package com.linkedin.kafka.cruisecontrol.client;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;
import kafka.cluster.BrokerEndPoint;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.InitiateShutdownRequest;
import org.apache.kafka.common.requests.InitiateShutdownResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

@NotThreadSafe
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/client/BlockingSendClient.class */
public class BlockingSendClient implements BlockingSend, AutoCloseable {
    private final KafkaConfig config;
    private final int socketTimeout;
    private final Time time;
    private final KafkaClient networkClient;
    private final Optional<? extends Reconfigurable> reconfigurableChannelBuilder;
    private final Node targetNode;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/client/BlockingSendClient$Builder.class */
    public static class Builder {
        private static final String METRIC_GROUP_PREFIX = "sbk-internal-broker-client";
        private final KafkaConfig config;
        private final Time time;
        private final String clientId;
        private final LogContext logContext;

        public Builder(KafkaConfig kafkaConfig, Time time, String str, LogContext logContext) {
            this.config = kafkaConfig;
            this.time = time;
            this.clientId = str;
            this.logContext = logContext;
        }

        public BlockingSendClient build(BrokerEndPoint brokerEndPoint) {
            Optional empty;
            Reconfigurable clientChannelBuilder = ChannelBuilders.clientChannelBuilder(this.config.interBrokerSecurityProtocol(), JaasContext.Type.SERVER, this.config, this.config.interBrokerListenerName(), this.config.saslMechanismInterBrokerProtocol(), this.time, this.config.saslInterBrokerHandshakeRequestEnable(), this.logContext);
            if (clientChannelBuilder instanceof Reconfigurable) {
                Reconfigurable reconfigurable = clientChannelBuilder;
                this.config.addReconfigurable(reconfigurable);
                empty = Optional.of(reconfigurable);
            } else {
                empty = Optional.empty();
            }
            HashMap hashMap = new HashMap();
            hashMap.put("broker-id", Integer.toString(brokerEndPoint.id()));
            return new BlockingSendClient(new Node(brokerEndPoint.id(), brokerEndPoint.host(), brokerEndPoint.port()), this.config, this.config.controllerSocketTimeoutMs(), this.time, new NetworkClient(new Selector(-1, this.config.connectionsMaxIdleMs().longValue(), new Metrics(), this.time, METRIC_GROUP_PREFIX, hashMap, true, clientChannelBuilder, this.logContext), new ManualMetadataUpdater(), this.clientId, 1, 0L, 0L, -1, this.config.socketReceiveBufferBytes().intValue(), this.config.requestTimeoutMs().intValue(), this.config.connectionSetupTimeoutMs().longValue(), this.config.connectionSetupTimeoutMaxMs().longValue(), this.time, false, new ApiVersions(), this.logContext), empty);
        }
    }

    BlockingSendClient(Node node, KafkaConfig kafkaConfig, int i, Time time, KafkaClient kafkaClient, Optional<? extends Reconfigurable> optional) {
        this.config = kafkaConfig;
        this.socketTimeout = i;
        this.time = time;
        this.networkClient = kafkaClient;
        this.reconfigurableChannelBuilder = optional;
        this.targetNode = node;
    }

    @Override // com.linkedin.kafka.cruisecontrol.client.BlockingSend
    public ClientResponse sendRequest(AbstractRequest.Builder<? extends AbstractRequest> builder) throws IOException {
        try {
            if (!NetworkClientUtils.awaitReady(this.networkClient, this.targetNode, this.time, this.socketTimeout)) {
                this.networkClient.close(this.targetNode.idString());
                throw new ConnectionException(String.format("Failed to connect to node %d within %d ms", Integer.valueOf(this.targetNode.id()), Integer.valueOf(this.socketTimeout)), new SocketTimeoutException());
            }
            try {
                return NetworkClientUtils.sendAndReceive(this.networkClient, this.networkClient.newClientRequest(this.targetNode.idString(), builder, this.time.milliseconds(), true), this.time);
            } catch (IOException e) {
                this.networkClient.close(this.targetNode.idString());
                throw e;
            }
        } catch (Exception e2) {
            this.networkClient.close(this.targetNode.idString());
            throw new ConnectionException(String.format("Failed to establish connection to node %d", Integer.valueOf(this.targetNode.id())), e2);
        }
    }

    public InitiateShutdownResponse sendShutdownRequest(InitiateShutdownRequest.Builder builder) throws IOException {
        return sendRequest(builder).responseBody();
    }

    @Override // com.linkedin.kafka.cruisecontrol.client.BlockingSend
    public void initiateClose() {
        this.networkClient.initiateClose();
    }

    @Override // com.linkedin.kafka.cruisecontrol.client.BlockingSend, java.lang.AutoCloseable
    public void close() throws Exception {
        Optional<? extends Reconfigurable> optional = this.reconfigurableChannelBuilder;
        KafkaConfig kafkaConfig = this.config;
        kafkaConfig.getClass();
        optional.ifPresent(kafkaConfig::removeReconfigurable);
        this.networkClient.close();
    }
}
