package com.linkedin.kafka.cruisecontrol.client;

import com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import io.confluent.databalancer.TestConstants;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.EndPoint;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.InitiateShutdownRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 2, unit = TimeUnit.MINUTES)
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/client/BlockingSendClientIntegrationTest.class */
public class BlockingSendClientIntegrationTest extends BrokerShutdownIntegrationTestHarness {
    private BlockingSendClient blockingSendClient;

    @Override // com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        EndPoint endPoint = this.brokerToShutdown.endPoint();
        this.blockingSendClient = new BlockingSendClient.Builder(this.brokerToShutdown.kafkaServer().config(), new SystemTime(), "client-id", new LogContext()).build(new BrokerEndPoint(this.brokerToShutdownId, endPoint.host(), endPoint.port()));
    }

    @Override // com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness
    protected int numBrokers() {
        return 1;
    }

    @Test
    public void testSendShutdownRequest() throws Exception {
        Assertions.assertEquals(Errors.NONE.code(), this.blockingSendClient.sendShutdownRequest(new InitiateShutdownRequest.Builder(brokerCurrentEpoch(brokerToShutdown()).longValue())).data().errorCode());
        assertBrokerShutdown(1);
    }

    @Test
    public void testSendShutdownRequestHitsError() throws Exception {
        Assertions.assertEquals(Errors.STALE_BROKER_EPOCH.code(), this.blockingSendClient.sendShutdownRequest(new InitiateShutdownRequest.Builder(-111L)).data().errorCode());
        Assertions.assertFalse(brokerExitCalled());
    }

    @Test
    public void testSendRequestOnOfflineBrokerShouldRaiseIOException() throws Exception {
        long longValue = brokerCurrentEpoch(brokerToShutdown()).longValue();
        brokerToShutdown().shutdown();
        Assertions.assertThrows(IOException.class, () -> {
            this.blockingSendClient.sendShutdownRequest(new InitiateShutdownRequest.Builder(longValue));
        });
    }
}
