package kafka.server;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import kafka.utils.TestUtils$TestControllerRequestCompletionHandler$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.EnvelopeRequest;
import org.apache.kafka.common.requests.EnvelopeResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;

/* compiled from: NodeToControllerRequestThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001]4Aa\u0004\t\u0001+!)A\u0004\u0001C\u0001;!)\u0001\u0005\u0001C\u0005C!)Q\u0007\u0001C\u0005m!)q\u0007\u0001C\u0001q!)q\t\u0001C\u0001q!)\u0011\n\u0001C\u0001q!)1\n\u0001C\u0001q!)Q\n\u0001C\u0001q!)q\n\u0001C\u0001q!)\u0011\u000b\u0001C\u0001q!)1\u000b\u0001C\u0001q!)Q\u000b\u0001C\u0001q!)q\u000b\u0001C\u00051\"91\u000eAI\u0001\n\u0013a'!\t(pI\u0016$vnQ8oiJ|G\u000e\\3s%\u0016\fX/Z:u)\"\u0014X-\u00193UKN$(BA\t\u0013\u0003\u0019\u0019XM\u001d<fe*\t1#A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u00011\u0002CA\f\u001b\u001b\u0005A\"\"A\r\u0002\u000bM\u001c\u0017\r\\1\n\u0005mA\"AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002=A\u0011q\u0004A\u0007\u0002!\u0005q1m\u001c8ue>dG.\u001a:J]\u001a|GC\u0001\u0012&!\ty2%\u0003\u0002%!\t)2i\u001c8ue>dG.\u001a:J]\u001a|'/\\1uS>t\u0007\"\u0002\u0014\u0003\u0001\u00049\u0013\u0001\u00028pI\u0016\u00042a\u0006\u0015+\u0013\tI\u0003D\u0001\u0004PaRLwN\u001c\t\u0003WMj\u0011\u0001\f\u0006\u0003[9\naaY8n[>t'BA\n0\u0015\t\u0001\u0014'\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002e\u0005\u0019qN]4\n\u0005Qb#\u0001\u0002(pI\u0016\f1#Z7qif\u001cuN\u001c;s_2dWM]%oM>,\u0012AI\u0001,i\u0016\u001cHOU3uef$\u0016.\\3pkR<\u0006.\u001b7f\u0007>tGO]8mY\u0016\u0014hj\u001c;Bm\u0006LG.\u00192mKR\t\u0011\b\u0005\u0002\u0018u%\u00111\b\u0007\u0002\u0005+:LG\u000f\u000b\u0002\u0005{A\u0011a(R\u0007\u0002\u007f)\u0011\u0001)Q\u0001\u0004CBL'B\u0001\"D\u0003\u001dQW\u000f]5uKJT!\u0001R\u0019\u0002\u000b),h.\u001b;\n\u0005\u0019{$\u0001\u0002+fgR\f\u0001\u0003^3tiJ+\u0017/^3tiN\u001cVM\u001c;)\u0005\u0015i\u0014!\u0006;fgR\u001cuN\u001c;s_2dWM]\"iC:<W\r\u001a\u0015\u0003\ru\n\u0011\u0003^3ti:{GoQ8oiJ|G\u000e\\3sQ\t9Q(\u0001\u0016uKN$XI\u001c<fY>\u0004XMU3ta>t7/Z,ji\"tu\u000e^\"p]R\u0014x\u000e\u001c7fe\u0016\u0013(o\u001c:)\u0005!i\u0014\u0001\u0005;fgR\u0014V\r\u001e:z)&lWm\\;uQ\tIQ(\u0001\u0010uKN$XK\\:vaB|'\u000f^3e-\u0016\u00148/[8o\u0011\u0006tG\r\\5oO\"\u0012!\"P\u0001$i\u0016\u001cH/Q;uQ\u0016tG/[2bi&|g.\u0012=dKB$\u0018n\u001c8IC:$G.\u001b8hQ\tYQ(\u0001\u000buKN$H\u000b\u001b:fC\u0012tu\u000e^*uCJ$X\r\u001a\u0015\u0003\u0019u\n\u0011\u0002]8mYVsG/\u001b7\u0015\teJfL\u001a\u0005\u000656\u0001\raW\u0001\u000ee\u0016\fX/Z:u)\"\u0014X-\u00193\u0011\u0005}a\u0016BA/\u0011\u0005uqu\u000eZ3U_\u000e{g\u000e\u001e:pY2,'OU3rk\u0016\u001cH\u000f\u00165sK\u0006$\u0007\"B0\u000e\u0001\u0004\u0001\u0017!C2p]\u0012LG/[8o!\r9\u0012mY\u0005\u0003Eb\u0011\u0011BR;oGRLwN\u001c\u0019\u0011\u0005]!\u0017BA3\u0019\u0005\u001d\u0011un\u001c7fC:DqaZ\u0007\u0011\u0002\u0003\u0007\u0001.\u0001\u0006nCb\u0014V\r\u001e:jKN\u0004\"aF5\n\u0005)D\"aA%oi\u0006\u0019\u0002o\u001c7m+:$\u0018\u000e\u001c\u0013eK\u001a\fW\u000f\u001c;%gU\tQN\u000b\u0002i].\nq\u000e\u0005\u0002qk6\t\u0011O\u0003\u0002sg\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0003ib\t!\"\u00198o_R\fG/[8o\u0013\t1\u0018OA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\u0004")
/* loaded from: input_file:kafka/server/NodeToControllerRequestThreadTest.class */
public class NodeToControllerRequestThreadTest {
    private ControllerInformation controllerInfo(Option<Node> option) {
        return new ControllerInformation(option, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", true);
    }

    private ControllerInformation emptyControllerInfo() {
        return controllerInfo(None$.MODULE$);
    }

    @Test
    public void testRetryTimeoutWhileControllerNotAvailable() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo());
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", 30000);
        nodeToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(None$.MODULE$);
        nodeToControllerRequestThread.enqueue(new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), testControllerRequestCompletionHandler));
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(1, nodeToControllerRequestThread.queueSize());
        mockTime.sleep(30000);
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(0, nodeToControllerRequestThread.queueSize());
        Assertions.assertTrue(testControllerRequestCompletionHandler.timedOut().get());
    }

    @Test
    public void testRequestsSent() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        Node node = new Node(2, "host", 1234);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(new Some(node)));
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        nodeToControllerRequestThread.started_$eq(true);
        mockClient.prepareResponse(metadataUpdateWith);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith));
        nodeToControllerRequestThread.enqueue(new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), testControllerRequestCompletionHandler));
        Assertions.assertEquals(1, nodeToControllerRequestThread.queueSize());
        nodeToControllerRequestThread.doWork();
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(0, nodeToControllerRequestThread.queueSize());
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testControllerChanged() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Node node = new Node(1, "host1", 1234);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(new Some(node)), new ControllerInformation[]{controllerInfo(new Some(new Node(2, "host2", 1234)))});
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        nodeToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith));
        nodeToControllerRequestThread.enqueue(new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), testControllerRequestCompletionHandler));
        mockClient.prepareResponse(metadataUpdateWith);
        nodeToControllerRequestThread.doWork();
        Assertions.assertFalse(testControllerRequestCompletionHandler.completed().get());
        mockClient.setUnreachable(node, mockTime.milliseconds() + 5000);
        nodeToControllerRequestThread.doWork();
        nodeToControllerRequestThread.doWork();
        nodeToControllerRequestThread.doWork();
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testNotController() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(new Some(new Node(1, "host1", 1234))), new ControllerInformation[]{controllerInfo(new Some(new Node(2, "host2", 1234)))});
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith("cluster1", 2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        MetadataResponse metadataUpdateWith2 = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        nodeToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith2));
        nodeToControllerRequestThread.enqueue(new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), testControllerRequestCompletionHandler));
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(1, "host1", 1234)), nodeToControllerRequestThread.activeControllerAddress());
        mockClient.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof MetadataRequest) && ((MetadataRequest) abstractRequest).allowAutoTopicCreation();
        }, metadataUpdateWith);
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(None$.MODULE$, nodeToControllerRequestThread.activeControllerAddress());
        nodeToControllerRequestThread.doWork();
        mockClient.prepareResponse(metadataUpdateWith2);
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(2, "host2", 1234)), nodeToControllerRequestThread.activeControllerAddress());
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testEnvelopeResponseWithNotControllerError() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id, (short) 0, (short) 0));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(new Some(new Node(1, "host1", 1234))), new ControllerInformation[]{controllerInfo(new Some(new Node(2, "host2", 1234)))});
        EnvelopeResponse envelopeResponse = new EnvelopeResponse(new EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()));
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        nodeToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith));
        nodeToControllerRequestThread.enqueue(new NodeToControllerQueueItem(mockTime.milliseconds(), new EnvelopeRequest.Builder(ByteBuffer.allocate(0), new DefaultKafkaPrincipalBuilder((KerberosShortNamer) null, (SslPrincipalMapper) null).serialize(new KafkaPrincipal("User", "principal", true)), "client-address".getBytes()), testControllerRequestCompletionHandler));
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(1, "host1", 1234)), nodeToControllerRequestThread.activeControllerAddress());
        mockClient.prepareResponse(abstractRequest -> {
            return abstractRequest instanceof EnvelopeRequest;
        }, envelopeResponse);
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(None$.MODULE$, nodeToControllerRequestThread.activeControllerAddress());
        nodeToControllerRequestThread.doWork();
        mockClient.prepareResponse(metadataUpdateWith);
        nodeToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(2, "host2", 1234)), nodeToControllerRequestThread.activeControllerAddress());
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testRetryTimeout() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(new Some(new Node(1, "host1", 1234))));
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith("cluster1", 2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", 30000);
        nodeToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(TestUtils$TestControllerRequestCompletionHandler$.MODULE$.$lessinit$greater$default$1());
        nodeToControllerRequestThread.enqueue(new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), testControllerRequestCompletionHandler));
        nodeToControllerRequestThread.doWork();
        mockTime.sleep(30000);
        mockClient.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof MetadataRequest) && ((MetadataRequest) abstractRequest).allowAutoTopicCreation();
        }, metadataUpdateWith);
        nodeToControllerRequestThread.doWork();
        Assertions.assertTrue(testControllerRequestCompletionHandler.timedOut().get());
    }

    @Test
    public void testUnsupportedVersionHandling() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(new Some(new Node(2, "host", 1234))));
        final AtomicReference atomicReference = new AtomicReference();
        final NodeToControllerRequestThreadTest nodeToControllerRequestThreadTest = null;
        NodeToControllerQueueItem nodeToControllerQueueItem = new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), new ControllerRequestCompletionHandler(nodeToControllerRequestThreadTest, atomicReference) { // from class: kafka.server.NodeToControllerRequestThreadTest$$anon$1
            private final AtomicReference callbackResponse$1;

            public void onTimeout() {
                Assertions.fail("Unexpected timeout exception");
            }

            public void onComplete(ClientResponse clientResponse) {
                this.callbackResponse$1.set(clientResponse);
            }

            {
                this.callbackResponse$1 = atomicReference;
            }
        });
        mockClient.prepareUnsupportedVersionResponse(abstractRequest -> {
            ApiKeys apiKey = abstractRequest.apiKey();
            ApiKeys apiKeys = ApiKeys.METADATA;
            return apiKey == null ? apiKeys == null : apiKey.equals(apiKeys);
        });
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        nodeToControllerRequestThread.started_$eq(true);
        nodeToControllerRequestThread.enqueue(nodeToControllerQueueItem);
        int pollUntil$default$3 = pollUntil$default$3();
        int i = 0;
        do {
            nodeToControllerRequestThread.doWork();
            i++;
            if ($anonfun$testUnsupportedVersionHandling$3(atomicReference)) {
                break;
            }
        } while (i < pollUntil$default$3);
        if (!$anonfun$testUnsupportedVersionHandling$3(atomicReference)) {
            Assertions.fail(new StringBuilder(47).append("Condition failed to be met after polling ").append(i).append(" times").toString());
        }
        Assertions.assertNotNull(((ClientResponse) atomicReference.get()).versionMismatch());
    }

    @Test
    public void testAuthenticationExceptionHandling() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Node node = new Node(2, "host", 1234);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(new Some(node)));
        final AtomicReference atomicReference = new AtomicReference();
        final NodeToControllerRequestThreadTest nodeToControllerRequestThreadTest = null;
        NodeToControllerQueueItem nodeToControllerQueueItem = new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), new ControllerRequestCompletionHandler(nodeToControllerRequestThreadTest, atomicReference) { // from class: kafka.server.NodeToControllerRequestThreadTest$$anon$2
            private final AtomicReference callbackResponse$2;

            public void onTimeout() {
                Assertions.fail("Unexpected timeout exception");
            }

            public void onComplete(ClientResponse clientResponse) {
                this.callbackResponse$2.set(clientResponse);
            }

            {
                this.callbackResponse$2 = atomicReference;
            }
        });
        mockClient.createPendingAuthenticationError(node, 50L);
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        nodeToControllerRequestThread.started_$eq(true);
        nodeToControllerRequestThread.enqueue(nodeToControllerQueueItem);
        int pollUntil$default$3 = pollUntil$default$3();
        int i = 0;
        do {
            nodeToControllerRequestThread.doWork();
            i++;
            if ($anonfun$testAuthenticationExceptionHandling$2(atomicReference)) {
                break;
            }
        } while (i < pollUntil$default$3);
        if (!$anonfun$testAuthenticationExceptionHandling$2(atomicReference)) {
            Assertions.fail(new StringBuilder(47).append("Condition failed to be met after polling ").append(i).append(" times").toString());
        }
        Assertions.assertNotNull(((ClientResponse) atomicReference.get()).authenticationException());
    }

    @Test
    public void testThreadNotStarted() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo());
        NodeToControllerRequestThread nodeToControllerRequestThread = new NodeToControllerRequestThread(mockClient, true, controllerInformation -> {
            return mockClient;
        }, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        NodeToControllerQueueItem nodeToControllerQueueItem = new NodeToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), new TestUtils.TestControllerRequestCompletionHandler(None$.MODULE$));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            nodeToControllerRequestThread.enqueue(nodeToControllerQueueItem);
        });
        Assertions.assertEquals(0, nodeToControllerRequestThread.queueSize());
    }

    private void pollUntil(NodeToControllerRequestThread nodeToControllerRequestThread, Function0<Object> function0, int i) {
        int i2 = 0;
        do {
            nodeToControllerRequestThread.doWork();
            i2++;
            if (function0.apply$mcZ$sp()) {
                break;
            }
        } while (i2 < i);
        if (function0.apply$mcZ$sp()) {
            return;
        }
        Assertions.fail(new StringBuilder(47).append("Condition failed to be met after polling ").append(i2).append(" times").toString());
    }

    private int pollUntil$default$3() {
        return 10;
    }

    public static final /* synthetic */ boolean $anonfun$testUnsupportedVersionHandling$3(AtomicReference atomicReference) {
        return atomicReference.get() != null;
    }

    public static final /* synthetic */ boolean $anonfun$testAuthenticationExceptionHandling$2(AtomicReference atomicReference) {
        return atomicReference.get() != null;
    }
}
