package com.linkedin.kafka.cruisecontrol.server;

import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.client.ConnectionException;
import com.linkedin.kafka.cruisecontrol.common.AdminClientResult;
import com.linkedin.kafka.cruisecontrol.common.KafkaCluster;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import kafka.cluster.BrokerEndPoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.InitiateShutdownRequest;
import org.apache.kafka.common.requests.InitiateShutdownResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/server/BrokerShutdownManager.class */
public class BrokerShutdownManager {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerShutdownManager.class);
    private static final int SHUTDOWN_WAIT_RETRY_DELAY_MS = 100;
    private final BlockingSendClient.Builder blockingSendClientBuilder;
    private final Time time;
    private final SbkAdminUtils adminUtils;
    private final long apiTimeoutMs;
    private final long shutdownWaitMs;

    public BrokerShutdownManager(SbkAdminUtils sbkAdminUtils, KafkaCruiseControlConfig kafkaCruiseControlConfig, BlockingSendClient.Builder builder, Time time) {
        this.adminUtils = sbkAdminUtils;
        this.apiTimeoutMs = kafkaCruiseControlConfig.getInt(KafkaCruiseControlConfig.DEFAULT_API_TIMEOUT_MS_CONFIG).intValue();
        this.shutdownWaitMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.BROKER_REMOVAL_SHUTDOWN_MS_CONFIG).longValue();
        this.blockingSendClientBuilder = builder;
        this.time = time;
    }

    public boolean maybeShutdownBroker(int i, Optional<Long> optional) throws Exception {
        AdminClientResult<KafkaCluster> describeCluster = this.adminUtils.describeCluster(this.apiTimeoutMs);
        if (describeCluster.hasException()) {
            throw new ExecutionException("Failed to describe the cluster", describeCluster.exception());
        }
        Optional<Node> findAny = describeCluster.result().nodes().stream().filter(node -> {
            return node.id() == i;
        }).findAny();
        if (!findAny.isPresent()) {
            LOG.info("Skipping shutdown of broker {} because it's not part of the cluster.", Integer.valueOf(i));
            return false;
        }
        if (!optional.isPresent()) {
            String format = String.format("Cannot shut down broker %d because no broker epoch was given.", Integer.valueOf(i));
            LOG.info(format);
            throw new IllegalArgumentException(format);
        }
        Node node2 = findAny.get();
        BlockingSendClient build = this.blockingSendClientBuilder.build(new BrokerEndPoint(node2.id(), node2.host(), node2.port()));
        Throwable th = null;
        try {
            try {
                shutdownBroker(build, i, optional.get().longValue());
                if (build == null) {
                    return true;
                }
                if (0 == 0) {
                    build.close();
                    return true;
                }
                try {
                    build.close();
                    return true;
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                    return true;
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    void shutdownBroker(BlockingSendClient blockingSendClient, int i, long j) throws ExecutionException, ApiException, TimeoutException, InterruptedException {
        String format = String.format("broker %d (epoch %d)", Integer.valueOf(i), Long.valueOf(j));
        InitiateShutdownResponse initiateShutdownResponse = null;
        try {
            initiateShutdownResponse = blockingSendClient.sendShutdownRequest(new InitiateShutdownRequest.Builder(j));
        } catch (ConnectionException e) {
            String format2 = String.format("Failed to connect to %s while trying to send shutdown request.", format);
            LOG.error(format2, e);
            throw new ExecutionException(format2, e);
        } catch (IOException e2) {
            LOG.info("Caught IOException (message: {}) while trying to shutdown {}.Assuming that the broker did not manage to respond before shutting down...", e2.getMessage(), format);
        } catch (Exception e3) {
            throw new ExecutionException(String.format("Unexpected exception occurred while trying to send shutdown request for %s", format), e3);
        }
        if (initiateShutdownResponse == null || initiateShutdownResponse.data().errorCode() == Errors.NONE.code()) {
            awaitBrokerShutdown(this.shutdownWaitMs, i);
        } else {
            ApiException exception = Errors.forCode(initiateShutdownResponse.data().errorCode()).exception();
            LOG.error("Failed shutting down broker due to exception in shutdown request", exception);
            throw exception;
        }
    }

    void awaitBrokerShutdown(long j, int i) throws InterruptedException, TimeoutException {
        long milliseconds = this.time.milliseconds();
        while (true) {
            AdminClientResult<KafkaCluster> describeCluster = this.adminUtils.describeCluster(this.apiTimeoutMs);
            long milliseconds2 = this.time.milliseconds() - milliseconds;
            if (describeCluster.hasException()) {
                LOG.warn("Failed to describe the cluster while awaiting broker {} shutdown. Retrying in {}ms", new Object[]{Integer.valueOf(i), Integer.valueOf(SHUTDOWN_WAIT_RETRY_DELAY_MS), describeCluster.exception()});
            } else {
                if (!describeCluster.result().nodes().stream().anyMatch(node -> {
                    return node.id() == i;
                })) {
                    LOG.info("Broker {} has left the cluster successfully ({}ms after shutdown initiation)", Integer.valueOf(i), Long.valueOf(milliseconds2));
                    return;
                }
                LOG.debug("Broker {} is still part of the cluster ({}ms after the shutdown initiation)", Integer.valueOf(i), Long.valueOf(milliseconds2));
            }
            if (milliseconds2 >= j) {
                throw new TimeoutException(String.format("Timed out after waiting for broker %d to shutdown for %dms", Integer.valueOf(i), Long.valueOf(j)));
            }
            this.time.sleep(100L);
        }
    }
}
