package com.linkedin.kafka.cruisecontrol.server;

import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.client.ConnectionException;
import com.linkedin.kafka.cruisecontrol.common.AdminClientResult;
import com.linkedin.kafka.cruisecontrol.common.KafkaCluster;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.cluster.BrokerEndPoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.InitiateShutdownResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.InitiateShutdownRequest;
import org.apache.kafka.common.requests.InitiateShutdownResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/server/BrokerShutdownManagerTest.class */
public class BrokerShutdownManagerTest {
    private static String apiTimeoutMs = "100";
    private static String removalShutdownMs = "1000";
    private static long mockTimeTickMs = 100;
    private static long brokerEpoch = 1;
    private static int brokerIdToRemove = 3;
    private List<Node> fullCluster = Arrays.asList(new Node(0, "", 9000), new Node(1, "", 9000), new Node(2, "", 9000), new Node(brokerIdToRemove, "", 9000));
    private List<Node> clusterWithBrokerRemoved = Arrays.asList(new Node(0, "", 9000), new Node(1, "", 9000), new Node(2, "", 9000));
    private AdminClientResult<KafkaCluster> fullClusterOpt = new AdminClientResult<>(new KafkaCluster(this.fullCluster, (Node) null, (String) null, (Set) null));
    private AdminClientResult<KafkaCluster> noBrokerClusterOpt = new AdminClientResult<>(new KafkaCluster(this.clusterWithBrokerRemoved, (Node) null, (String) null, (Set) null));
    private SbkAdminUtils mockAdminUtils;
    private Time mockTime;
    private BrokerShutdownManager removalManager;
    private BlockingSendClient mockShutdownClient;
    private BlockingSendClient.Builder mockShutdownClientBuilder;

    @Before
    public void setUp() {
        this.mockTime = new MockTime();
        this.mockAdminUtils = (SbkAdminUtils) Mockito.mock(SbkAdminUtils.class);
        this.mockShutdownClientBuilder = (BlockingSendClient.Builder) Mockito.mock(BlockingSendClient.Builder.class);
        this.mockShutdownClient = (BlockingSendClient) Mockito.mock(BlockingSendClient.class);
        Mockito.when(this.mockShutdownClientBuilder.build((BrokerEndPoint) ArgumentMatchers.any())).thenReturn(this.mockShutdownClient);
        this.removalManager = new BrokerShutdownManager(this.mockAdminUtils, config(), this.mockShutdownClientBuilder, this.mockTime);
    }

    @Test
    public void testMaybeShutdownBrokerShutsDownBroker() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(this.mockAdminUtils.describeCluster(Long.parseLong(apiTimeoutMs))).thenAnswer(invocationOnMock -> {
            return atomicInteger.incrementAndGet() == 2 ? this.noBrokerClusterOpt : this.fullClusterOpt;
        });
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenReturn(new InitiateShutdownResponse(new InitiateShutdownResponseData().setErrorCode(Errors.NONE.code())));
        Assert.assertTrue("Expected to have shut down the broker proactively", this.removalManager.maybeShutdownBroker(brokerIdToRemove, Optional.of(Long.valueOf(brokerEpoch))));
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(2);
    }

    @Test
    public void testMaybeShutdownThrowsExceptionIfUnableToInitiallyGetClusterMetadata() throws Exception {
        Mockito.when(this.mockAdminUtils.describeCluster(Long.parseLong(apiTimeoutMs))).thenReturn(new AdminClientResult(new Exception("failed!")));
        Assert.assertThrows(ExecutionException.class, () -> {
            this.removalManager.maybeShutdownBroker(brokerIdToRemove, Optional.of(Long.valueOf(brokerEpoch)));
        });
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(0))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testMaybeShutdownDoesNotInitiateShutdownIfBrokerNotPartOfCluster() throws Exception {
        mockDescribeClusterBrokerRemoved();
        Assert.assertFalse("Expected to not have proactively shut down broker since it was not part of the cluster", this.removalManager.maybeShutdownBroker(brokerIdToRemove, Optional.of(Long.valueOf(brokerEpoch))));
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(0))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testMaybeShutdownDoesNotInitiateShutdownIfBrokerEpochNotProvided() throws Exception {
        mockDescribeClusterFullCluster();
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.removalManager.maybeShutdownBroker(brokerIdToRemove, Optional.empty());
        });
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(0))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testShutdownBrokerShutsDownBroker() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenReturn(new InitiateShutdownResponse(new InitiateShutdownResponseData().setErrorCode(Errors.NONE.code())));
        mockDescribeClusterBrokerRemoved();
        this.removalManager.shutdownBroker(this.mockShutdownClient, brokerIdToRemove, brokerEpoch);
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testShutdownBrokerShutsDownBrokerAndThrowsTimeoutExceptionIfItTakesLongToLeaveCluster() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenReturn(new InitiateShutdownResponse(new InitiateShutdownResponseData().setErrorCode(Errors.NONE.code())));
        mockDescribeClusterFullCluster();
        Assert.assertThrows(TimeoutException.class, () -> {
            this.removalManager.shutdownBroker(this.mockShutdownClient, brokerIdToRemove, brokerEpoch);
        });
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testShutdownBrokerHandlesNoShutdownResponse() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new IOException("disconnected!")});
        mockDescribeClusterBrokerRemoved();
        this.removalManager.shutdownBroker(this.mockShutdownClient, brokerIdToRemove, brokerEpoch);
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testShutdownBrokerHandlesAndReThrowsConnectionException() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new ConnectionException("Could not connect!", (Throwable) null)});
        Assert.assertTrue("Expected cause to be ConnectionException", ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
            this.removalManager.shutdownBroker(this.mockShutdownClient, brokerIdToRemove, brokerEpoch);
        })).getCause() instanceof ConnectionException);
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testShutdownBrokerHandlesAndReThrowsGenericException() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            throw new Exception("Something generic!");
        });
        Assert.assertTrue("Expected cause to be Exception", ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
            this.removalManager.shutdownBroker(this.mockShutdownClient, brokerIdToRemove, brokerEpoch);
        })).getCause() instanceof Exception);
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testShutdownBrokerHandlesAndThrowsExceptionOnResponseAPIError() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenReturn(new InitiateShutdownResponse(new InitiateShutdownResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code()).setErrorMessage("Stale epoch!")));
        Assert.assertTrue("Expected exception to be StaleBrokerEpochException", Assert.assertThrows(ApiException.class, () -> {
            this.removalManager.shutdownBroker(this.mockShutdownClient, brokerIdToRemove, brokerEpoch);
        }) instanceof StaleBrokerEpochException);
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testAwaitBrokerShutdownDescribeClusterCallsFailRetriesAndWaitsUntilTimeoutAndThrowsException() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(this.mockAdminUtils.describeCluster(Long.parseLong(apiTimeoutMs))).thenAnswer(invocationOnMock -> {
            return atomicInteger.incrementAndGet() % 2 == 0 ? new AdminClientResult(new Exception("error")) : this.fullClusterOpt;
        });
        try {
            this.removalManager.awaitBrokerShutdown(mockTimeTickMs * 10, brokerIdToRemove);
            Assert.fail("Expected a TimeoutException to be thrown");
        } catch (TimeoutException e) {
        }
        verifyDescribeClusterCalled(11);
    }

    @Test
    public void testAwaitBrokerShutdownStopsWhenDescribeClusterReturnsBrokerMissing() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(this.mockAdminUtils.describeCluster(Long.parseLong(apiTimeoutMs))).thenAnswer(invocationOnMock -> {
            return atomicInteger.incrementAndGet() == 5 ? this.noBrokerClusterOpt : this.fullClusterOpt;
        });
        this.removalManager.awaitBrokerShutdown(mockTimeTickMs * 10, brokerIdToRemove);
        verifyDescribeClusterCalled(5);
    }

    private void mockDescribeClusterBrokerRemoved() throws InterruptedException {
        Mockito.when(this.mockAdminUtils.describeCluster(Long.parseLong(apiTimeoutMs))).thenReturn(new AdminClientResult(new KafkaCluster(this.clusterWithBrokerRemoved, (Node) null, (String) null, (Set) null)));
    }

    private void mockDescribeClusterFullCluster() throws InterruptedException {
        Mockito.when(this.mockAdminUtils.describeCluster(Long.parseLong(apiTimeoutMs))).thenReturn(new AdminClientResult(new KafkaCluster(this.fullCluster, (Node) null, (String) null, (Set) null)));
    }

    private void verifyDescribeClusterCalled(int i) throws InterruptedException {
        ((SbkAdminUtils) Mockito.verify(this.mockAdminUtils, Mockito.times(i))).describeCluster(Long.parseLong(apiTimeoutMs));
    }

    private KafkaCruiseControlConfig config() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bootstrap.servers");
        properties.setProperty("zookeeper.connect", "connect:1234");
        properties.setProperty("default.api.timeout.ms", apiTimeoutMs);
        properties.setProperty("broker.removal.shutdown.timeout.ms", removalShutdownMs);
        return new KafkaCruiseControlConfig(properties);
    }
}
