package com.linkedin.kafka.cruisecontrol.client;

import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Optional;
import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.InitiateShutdownRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import scala.Option;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/client/BlockingSendClientTest.class */
public class BlockingSendClientTest {
    private int brokerId = 0;
    private BrokerEndPoint brokerEndpoint = new BrokerEndPoint(this.brokerId, "localhost", 1000);
    private long timeTickMs = 10000;
    private long socketTimeoutMs = 9000;
    private Time time = new MockTime(this.timeTickMs, 0, 0);
    private String clientId = "test-blocking-send-client";
    private LogContext logContext = new LogContext();
    private KafkaConfig config;

    @Mock
    private NetworkClient mockNetworkClient;

    @Mock
    private Node mockTargetBroker;

    @Before
    public void setUp() {
        Properties createBrokerConfig = TestUtils.createBrokerConfig(this.brokerId, "localhost:1234", true, true, TestUtils.RandomPort(), Option.empty(), Option.empty(), Option.empty(), true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.empty(), 1, false, 1, (short) 1);
        createBrokerConfig.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), Long.toString(this.socketTimeoutMs));
        this.config = KafkaConfig.fromProps(createBrokerConfig);
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testBuild() throws Exception {
        new BlockingSendClient.Builder(this.config, this.time, this.clientId, this.logContext).build(this.brokerEndpoint).close();
    }

    @Test
    public void testNetworkClientNotReadyThrowsConnectionExceptionCausedBySocketTimeoutException() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockNetworkClient.isReady(this.mockTargetBroker, this.timeTickMs))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.mockNetworkClient.isReady(this.mockTargetBroker, this.timeTickMs * 2))).thenReturn(false);
        try {
            new BlockingSendClient(this.mockTargetBroker, this.config, (int) this.socketTimeoutMs, this.time, this.mockNetworkClient, Optional.empty()).sendShutdownRequest(new InitiateShutdownRequest.Builder(1L));
            Assert.fail("Expected sendShutdownRequest() to throw a ConnectionException");
        } catch (ConnectionException e) {
            Assert.assertTrue("Expected the ConnectionException's cause to be SocketTimeoutException", e.getCause() instanceof SocketTimeoutException);
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(1))).close(this.mockTargetBroker.idString());
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(1))).isReady(this.mockTargetBroker, this.timeTickMs);
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(2))).isReady(this.mockTargetBroker, this.timeTickMs * 2);
        }
    }

    @Test
    public void testNetworkClientFailedConnectionThrowsConnectionExceptionCausedByIOException() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockNetworkClient.isReady(this.mockTargetBroker, this.timeTickMs))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.mockNetworkClient.isReady(this.mockTargetBroker, this.timeTickMs * 2))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.mockNetworkClient.connectionFailed(this.mockTargetBroker))).thenReturn(true);
        try {
            new BlockingSendClient(this.mockTargetBroker, this.config, (int) this.socketTimeoutMs, this.time, this.mockNetworkClient, Optional.empty()).sendShutdownRequest(new InitiateShutdownRequest.Builder(1L));
            Assert.fail("Expected sendShutdownRequest() to throw a ConnectionException");
        } catch (ConnectionException e) {
            Assert.assertTrue("Expected the ConnectionException's cause to be IOException", e.getCause() instanceof IOException);
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(1))).close(this.mockTargetBroker.idString());
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(1))).isReady(this.mockTargetBroker, this.timeTickMs);
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(2))).isReady(this.mockTargetBroker, this.timeTickMs * 2);
        }
    }

    @Test
    public void testNetworkClientSendExceptionClosesClientAndIsRethrown() {
        InitiateShutdownRequest.Builder builder = new InitiateShutdownRequest.Builder(1L);
        Mockito.when(Boolean.valueOf(this.mockNetworkClient.isReady(this.mockTargetBroker, this.timeTickMs))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockNetworkClient.active())).thenReturn(false);
        try {
            new BlockingSendClient(this.mockTargetBroker, this.config, (int) this.socketTimeoutMs, this.time, this.mockNetworkClient, Optional.empty()).sendShutdownRequest(builder);
            Assert.fail("Expected sendShutdownRequest() to throw an IOException");
        } catch (IOException e) {
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(1))).close(this.mockTargetBroker.idString());
            ((NetworkClient) Mockito.verify(this.mockNetworkClient, Mockito.times(1))).isReady(this.mockTargetBroker, this.timeTickMs);
        }
    }
}
