package com.linkedin.kafka.cruisecontrol.server;

import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.client.ConnectionException;
import com.linkedin.kafka.cruisecontrol.common.AdminClientResult;
import com.linkedin.kafka.cruisecontrol.common.KafkaCluster;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.cluster.BrokerEndPoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.InitiateShutdownResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.InitiateShutdownRequest;
import org.apache.kafka.common.requests.InitiateShutdownResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/server/BrokerShutdownManagerTest.class */
public class BrokerShutdownManagerTest {
    private final String apiTimeoutMs = "100";
    private final String removalShutdownMsStr = "1000";
    private final long mockTimeTickMs = 100;
    private final long removalShutdownMs = Long.parseLong("1000");
    private final long mockTimeTicksToTimeout = this.removalShutdownMs / 100;
    private final long brokerEpoch = 1;
    private final Set<Integer> brokerIdsToRemove = new HashSet(Arrays.asList(3, 4));
    private final Map<Integer, Optional<Long>> brokerIdsToRemoveWithValidEpochs = (Map) this.brokerIdsToRemove.stream().collect(Collectors.toMap(num -> {
        return num;
    }, num2 -> {
        return Optional.of(1L);
    }));
    private final Map<Integer, Optional<Long>> brokerIdsToRemoveWithNoEpochs = (Map) this.brokerIdsToRemove.stream().collect(Collectors.toMap(num -> {
        return num;
    }, num2 -> {
        return Optional.empty();
    }));
    private final int firstBrokerIdToRemove = 3;
    private final Node firstBrokerToRemove = node(3);
    private final int secondBrokerIdToRemove = 4;
    private final Node secondBrokerToRemove = node(4);
    private final List<Node> brokersToRemove = Arrays.asList(this.firstBrokerToRemove, this.secondBrokerToRemove);
    private final List<Node> clusterWithAllBrokersRemoved = Arrays.asList(node(0), node(1), node(2));
    private final List<Node> fullCluster = (List) Stream.concat(this.clusterWithAllBrokersRemoved.stream(), this.brokersToRemove.stream()).collect(Collectors.toList());
    private final List<Node> clusterWithOneBrokerRemoved = fullClusterWithoutNode(node(4));
    private final AdminClientResult<KafkaCluster> fullClusterResult = clusterResult(this.fullCluster);
    private final AdminClientResult<KafkaCluster> oneBrokerRemovedClusterResult = clusterResult(this.clusterWithOneBrokerRemoved);
    private final AdminClientResult<KafkaCluster> allBrokersRemovedClusterResult = clusterResult(this.clusterWithAllBrokersRemoved);
    private SbkAdminUtils mockAdminUtils;
    private Time mockTime;
    private BrokerShutdownManager removalManager;
    private BlockingSendClient mockShutdownClient;
    private BlockingSendClient.Builder mockShutdownClientBuilder;

    @BeforeEach
    public void setUp() {
        this.mockTime = new MockTime();
        this.mockAdminUtils = (SbkAdminUtils) Mockito.mock(SbkAdminUtils.class);
        this.mockShutdownClientBuilder = (BlockingSendClient.Builder) Mockito.mock(BlockingSendClient.Builder.class);
        this.mockShutdownClient = (BlockingSendClient) Mockito.mock(BlockingSendClient.class);
        Mockito.when(this.mockShutdownClientBuilder.build((BrokerEndPoint) ArgumentMatchers.any())).thenReturn(this.mockShutdownClient);
        this.removalManager = new BrokerShutdownManager(this.mockAdminUtils, config(), this.mockShutdownClientBuilder, this.mockTime);
    }

    @Test
    public void testMaybeShutdownBrokerShutsDownAllBrokers() throws Exception {
        mockDescribeConsecutiveResults(this.fullClusterResult, this.allBrokersRemovedClusterResult);
        mockSuccessfulShutdown();
        Map maybeShutdownBrokers = this.removalManager.maybeShutdownBrokers(this.brokerIdsToRemoveWithValidEpochs);
        Assertions.assertTrue(maybeShutdownBrokers.values().stream().allMatch(bool -> {
            return bool.booleanValue();
        }), String.format("Expected to have shut down all the brokers proactively in %s", maybeShutdownBrokers));
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(2))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(2);
    }

    @Test
    public void testMaybeShutdownBrokerShutsDownOnlyOnlineBrokers() throws Exception {
        mockDescribeConsecutiveResults(this.oneBrokerRemovedClusterResult, this.allBrokersRemovedClusterResult);
        mockSuccessfulShutdown();
        HashMap hashMap = new HashMap();
        hashMap.put(3, Optional.of(1L));
        hashMap.put(4, Optional.empty());
        Map maybeShutdownBrokers = this.removalManager.maybeShutdownBrokers(hashMap);
        Assertions.assertTrue(((Boolean) maybeShutdownBrokers.get(3)).booleanValue(), "Expected first broker to be online and therefore shut down by the manager");
        Assertions.assertFalse(((Boolean) maybeShutdownBrokers.get(4)).booleanValue(), "Expected the second broker to be offline and therefore not shut down by the manager");
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(2);
    }

    @Test
    public void testMaybeShutdownThrowsExceptionIfUnableToInitiallyGetClusterMetadata() throws Exception {
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenReturn(new AdminClientResult(new Exception("failed!")));
        Assertions.assertThrows(ExecutionException.class, () -> {
            this.removalManager.maybeShutdownBrokers(this.brokerIdsToRemoveWithValidEpochs);
        });
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(0))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testMaybeShutdownDoesNotInitiateShutdownIfAllBrokersNotPartOfCluster() throws Exception {
        mockDescribeClusterWithBrokersRemoved();
        Assertions.assertEquals(this.removalManager.maybeShutdownBrokers(this.brokerIdsToRemoveWithValidEpochs), (Map) this.brokerIdsToRemove.stream().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return false;
        })), "Expected to not have proactively shut down brokers since they were not part of the cluster");
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(0))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testMaybeShutdownDoesNotInitiateShutdownIfBrokerEpochForAllBrokersNotProvided() throws Exception {
        mockDescribeClusterFullCluster();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.removalManager.maybeShutdownBrokers(this.brokerIdsToRemoveWithNoEpochs);
        });
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(0))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
        verifyDescribeClusterCalled(1);
    }

    @Test
    public void testShutdownBrokerShutsDownBroker() throws Exception {
        mockSuccessfulShutdown();
        this.removalManager.initiateBrokerShutdown(this.mockShutdownClient, 3, 1L);
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testShutdownBrokerHandlesNoShutdownResponse() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new IOException("disconnected!")});
        this.removalManager.initiateBrokerShutdown(this.mockShutdownClient, 3, 1L);
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testShutdownBrokerHandlesAndReThrowsConnectionException() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new ConnectionException("Could not connect!", (Throwable) null)});
        Assertions.assertTrue(((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.removalManager.initiateBrokerShutdown(this.mockShutdownClient, 3, 1L);
        })).getCause() instanceof ConnectionException, "Expected cause to be ConnectionException");
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testShutdownBrokerHandlesAndReThrowsGenericException() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            throw new Exception("Something generic!");
        });
        Assertions.assertTrue(((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.removalManager.initiateBrokerShutdown(this.mockShutdownClient, 3, 1L);
        })).getCause() instanceof Exception, "Expected cause to be Exception");
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testShutdownBrokerHandlesAndThrowsExceptionOnResponseAPIError() throws Exception {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenReturn(new InitiateShutdownResponse(new InitiateShutdownResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code()).setErrorMessage("Stale epoch!")));
        Assertions.assertTrue(Assertions.assertThrows(ApiException.class, () -> {
            this.removalManager.initiateBrokerShutdown(this.mockShutdownClient, 3, 1L);
        }) instanceof StaleBrokerEpochException, "Expected exception to be StaleBrokerEpochException");
        ((BlockingSendClient) Mockito.verify(this.mockShutdownClient, Mockito.times(1))).sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void testAwaitBrokerShutdownThrowsWhenOneBrokerDoesntGoOffline() throws InterruptedException {
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenAnswer(invocationOnMock -> {
            return this.oneBrokerRemovedClusterResult;
        });
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.removalManager.awaitBrokersShutdown(this.removalShutdownMs, this.brokerIdsToRemove);
        });
        verifyDescribeClusterCalled(((int) this.mockTimeTicksToTimeout) + 1);
    }

    @Test
    public void testAwaitBrokerShutdownTimesOutIfABrokerComesBackOnline() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        AdminClientResult<KafkaCluster> clusterResult = clusterResult(fullClusterWithoutNode(this.firstBrokerToRemove));
        AdminClientResult<KafkaCluster> clusterResult2 = clusterResult(fullClusterWithoutNode(this.secondBrokerToRemove));
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenAnswer(invocationOnMock -> {
            int incrementAndGet = atomicInteger.incrementAndGet();
            return incrementAndGet == 1 ? this.fullClusterResult : incrementAndGet == 2 ? clusterResult : clusterResult2;
        });
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.removalManager.awaitBrokersShutdown(this.removalShutdownMs, this.brokerIdsToRemove);
        });
        verifyDescribeClusterCalled(((int) this.mockTimeTicksToTimeout) + 1);
    }

    @Test
    public void testAwaitBrokerShutdownDescribeClusterCallsFailRetriesAndWaitsUntilTimeoutAndThrowsException() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenAnswer(invocationOnMock -> {
            return atomicInteger.incrementAndGet() % 2 == 0 ? new AdminClientResult(new Exception("error")) : this.oneBrokerRemovedClusterResult;
        });
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.removalManager.awaitBrokersShutdown(this.removalShutdownMs, this.brokerIdsToRemove);
        });
        verifyDescribeClusterCalled(((int) this.mockTimeTicksToTimeout) + 1);
    }

    @Test
    public void testAwaitBrokerShutdownStopsWhenDescribeClusterReturnsBrokersShutdown() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenAnswer(invocationOnMock -> {
            return atomicInteger.incrementAndGet() == 5 ? this.allBrokersRemovedClusterResult : this.oneBrokerRemovedClusterResult;
        });
        this.removalManager.awaitBrokersShutdown(this.removalShutdownMs, this.brokerIdsToRemove);
        verifyDescribeClusterCalled(5);
    }

    private void mockDescribeConsecutiveResults(AdminClientResult<KafkaCluster> adminClientResult, AdminClientResult<KafkaCluster> adminClientResult2) throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenAnswer(invocationOnMock -> {
            return atomicInteger.incrementAndGet() >= 2 ? adminClientResult2 : adminClientResult;
        });
    }

    private void mockDescribeClusterWithBrokersRemoved() throws InterruptedException {
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenReturn(new AdminClientResult(new KafkaCluster(this.clusterWithAllBrokersRemoved, (Node) null, (String) null, (Set) null)));
    }

    private void mockDescribeClusterFullCluster() throws InterruptedException {
        Mockito.when(this.mockAdminUtils.describeCluster(Integer.parseInt("100"))).thenReturn(new AdminClientResult(new KafkaCluster(this.fullCluster, (Node) null, (String) null, (Set) null)));
    }

    private void verifyDescribeClusterCalled(int i) throws InterruptedException {
        ((SbkAdminUtils) Mockito.verify(this.mockAdminUtils, Mockito.times(i))).describeCluster(Integer.parseInt("100"));
    }

    private void mockSuccessfulShutdown() throws IOException {
        Mockito.when(this.mockShutdownClient.sendShutdownRequest((InitiateShutdownRequest.Builder) ArgumentMatchers.any())).thenReturn(new InitiateShutdownResponse(new InitiateShutdownResponseData().setErrorCode(Errors.NONE.code())));
    }

    private KafkaCruiseControlConfig config() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bootstrap.servers");
        properties.setProperty("zookeeper.connect", "connect:1234");
        properties.setProperty("default.api.timeout.ms", "100");
        properties.setProperty("broker.removal.shutdown.timeout.ms", "1000");
        return new KafkaCruiseControlConfig(properties);
    }

    private List<Node> fullClusterWithoutNode(Node node) {
        return (List) this.fullCluster.stream().filter(node2 -> {
            return node2.id() != node.id();
        }).collect(Collectors.toList());
    }

    private Node node(int i) {
        return new Node(i, "", 9000);
    }

    private AdminClientResult<KafkaCluster> clusterResult(List<Node> list) {
        return new AdminClientResult<>(new KafkaCluster(list, (Node) null, (String) null, (Set) null));
    }
}
