package kafka.server;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import kafka.utils.TestUtils$TestControllerRequestCompletionHandler$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.EnvelopeRequest;
import org.apache.kafka.common.requests.EnvelopeResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;

/* compiled from: BrokerToControllerRequestThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00014A!\u0004\b\u0001'!)!\u0004\u0001C\u00017!)a\u0004\u0001C\u0001?!)\u0001\u0007\u0001C\u0001?!)!\u0007\u0001C\u0001?!)A\u0007\u0001C\u0001?!)a\u0007\u0001C\u0001?!)\u0001\b\u0001C\u0001?!)!\b\u0001C\u0001?!)A\b\u0001C\u0001?!)a\b\u0001C\u0001?!)\u0001\t\u0001C\u0005\u0003\"9A\u000bAI\u0001\n\u0013)&a\t\"s_.,'\u000fV8D_:$(o\u001c7mKJ\u0014V-];fgR$\u0006N]3bIR+7\u000f\u001e\u0006\u0003\u001fA\taa]3sm\u0016\u0014(\"A\t\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0006\t\u0003+ai\u0011A\u0006\u0006\u0002/\u0005)1oY1mC&\u0011\u0011D\u0006\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005a\u0002CA\u000f\u0001\u001b\u0005q\u0011a\u000b;fgR\u0014V\r\u001e:z)&lWm\\;u/\"LG.Z\"p]R\u0014x\u000e\u001c7fe:{G/\u0011<bS2\f'\r\\3\u0015\u0003\u0001\u0002\"!F\u0011\n\u0005\t2\"\u0001B+oSRD#A\u0001\u0013\u0011\u0005\u0015rS\"\u0001\u0014\u000b\u0005\u001dB\u0013aA1qS*\u0011\u0011FK\u0001\bUV\u0004\u0018\u000e^3s\u0015\tYC&A\u0003kk:LGOC\u0001.\u0003\ry'oZ\u0005\u0003_\u0019\u0012A\u0001V3ti\u0006\u0001B/Z:u%\u0016\fX/Z:ugN+g\u000e\u001e\u0015\u0003\u0007\u0011\nQ\u0003^3ti\u000e{g\u000e\u001e:pY2,'o\u00115b]\u001e,G\r\u000b\u0002\u0005I\u0005\tB/Z:u\u001d>$8i\u001c8ue>dG.\u001a:)\u0005\u0015!\u0013A\u000b;fgR,eN^3m_B,'+Z:q_:\u001cXmV5uQ:{GoQ8oiJ|G\u000e\\3s\u000bJ\u0014xN\u001d\u0015\u0003\r\u0011\n\u0001\u0003^3tiJ+GO]=US6,w.\u001e;)\u0005\u001d!\u0013A\b;fgR,fn];qa>\u0014H/\u001a3WKJ\u001c\u0018n\u001c8IC:$G.\u001b8hQ\tAA%A\u0012uKN$\u0018)\u001e;iK:$\u0018nY1uS>tW\t_2faRLwN\u001c%b]\u0012d\u0017N\\4)\u0005%!\u0013\u0001\u0006;fgR$\u0006N]3bI:{Go\u0015;beR,G\r\u000b\u0002\u000bI\u0005I\u0001o\u001c7m+:$\u0018\u000e\u001c\u000b\u0005A\t;u\nC\u0003D\u0017\u0001\u0007A)A\u0007sKF,Xm\u001d;UQJ,\u0017\r\u001a\t\u0003;\u0015K!A\u0012\b\u0003?\t\u0013xn[3s)>\u001cuN\u001c;s_2dWM\u001d*fcV,7\u000f\u001e+ie\u0016\fG\rC\u0003I\u0017\u0001\u0007\u0011*A\u0005d_:$\u0017\u000e^5p]B\u0019QC\u0013'\n\u0005-3\"!\u0003$v]\u000e$\u0018n\u001c81!\t)R*\u0003\u0002O-\t9!i\\8mK\u0006t\u0007b\u0002)\f!\u0003\u0005\r!U\u0001\u000b[\u0006D(+\u001a;sS\u0016\u001c\bCA\u000bS\u0013\t\u0019fCA\u0002J]R\f1\u0003]8mYVsG/\u001b7%I\u00164\u0017-\u001e7uIM*\u0012A\u0016\u0016\u0003#^[\u0013\u0001\u0017\t\u00033zk\u0011A\u0017\u0006\u00037r\u000b\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0005u3\u0012AC1o]>$\u0018\r^5p]&\u0011qL\u0017\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007")
/* loaded from: input_file:kafka/server/BrokerToControllerRequestThreadTest.class */
public class BrokerToControllerRequestThreadTest {
    @Test
    public void testRetryTimeoutWhileControllerNotAvailable() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.get()).thenReturn(None$.MODULE$);
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", 30000);
        brokerToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(None$.MODULE$);
        brokerToControllerRequestThread.enqueue(new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), testControllerRequestCompletionHandler));
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(1, brokerToControllerRequestThread.queueSize());
        mockTime.sleep(30000);
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(0, brokerToControllerRequestThread.queueSize());
        Assertions.assertTrue(testControllerRequestCompletionHandler.timedOut().get());
    }

    @Test
    public void testRequestsSent() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.get()).thenReturn(new Some(new Node(2, "host", 1234)));
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        brokerToControllerRequestThread.started_$eq(true);
        mockClient.prepareResponse(metadataUpdateWith);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith));
        brokerToControllerRequestThread.enqueue(new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), testControllerRequestCompletionHandler));
        Assertions.assertEquals(1, brokerToControllerRequestThread.queueSize());
        brokerToControllerRequestThread.doWork();
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(0, brokerToControllerRequestThread.queueSize());
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testControllerChanged() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Node node = new Node(1, "host1", 1234);
        Mockito.when(controllerNodeProvider.get()).thenReturn(new Some(node), new Option[]{new Some(new Node(2, "host2", 1234))});
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        brokerToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith));
        brokerToControllerRequestThread.enqueue(new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), testControllerRequestCompletionHandler));
        mockClient.prepareResponse(metadataUpdateWith);
        brokerToControllerRequestThread.doWork();
        Assertions.assertFalse(testControllerRequestCompletionHandler.completed().get());
        mockClient.setUnreachable(node, mockTime.milliseconds() + 5000);
        brokerToControllerRequestThread.doWork();
        brokerToControllerRequestThread.doWork();
        brokerToControllerRequestThread.doWork();
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testNotController() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.get()).thenReturn(new Some(new Node(1, "host1", 1234)), new Option[]{new Some(new Node(2, "host2", 1234))});
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith("cluster1", 2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        MetadataResponse metadataUpdateWith2 = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        brokerToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith2));
        brokerToControllerRequestThread.enqueue(new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), testControllerRequestCompletionHandler));
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(1, "host1", 1234)), brokerToControllerRequestThread.activeControllerAddress());
        mockClient.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof MetadataRequest) && ((MetadataRequest) abstractRequest).allowAutoTopicCreation();
        }, metadataUpdateWith);
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(None$.MODULE$, brokerToControllerRequestThread.activeControllerAddress());
        brokerToControllerRequestThread.doWork();
        mockClient.prepareResponse(metadataUpdateWith2);
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(2, "host2", 1234)), brokerToControllerRequestThread.activeControllerAddress());
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testEnvelopeResponseWithNotControllerError() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id, (short) 0, (short) 0));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.get()).thenReturn(new Some(new Node(1, "host1", 1234)), new Option[]{new Some(new Node(2, "host2", 1234))});
        EnvelopeResponse envelopeResponse = new EnvelopeResponse(new EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()));
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        brokerToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(new Some(metadataUpdateWith));
        brokerToControllerRequestThread.enqueue(new BrokerToControllerQueueItem(mockTime.milliseconds(), new EnvelopeRequest.Builder(ByteBuffer.allocate(0), new DefaultKafkaPrincipalBuilder((KerberosShortNamer) null, (SslPrincipalMapper) null).serialize(new KafkaPrincipal("User", "principal", true)), "client-address".getBytes()), testControllerRequestCompletionHandler));
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(1, "host1", 1234)), brokerToControllerRequestThread.activeControllerAddress());
        mockClient.prepareResponse(abstractRequest -> {
            return abstractRequest instanceof EnvelopeRequest;
        }, envelopeResponse);
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(None$.MODULE$, brokerToControllerRequestThread.activeControllerAddress());
        brokerToControllerRequestThread.doWork();
        mockClient.prepareResponse(metadataUpdateWith);
        brokerToControllerRequestThread.doWork();
        Assertions.assertEquals(new Some(new Node(2, "host2", 1234)), brokerToControllerRequestThread.activeControllerAddress());
        Assertions.assertTrue(testControllerRequestCompletionHandler.completed().get());
    }

    @Test
    public void testRetryTimeout() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.get()).thenReturn(new Some(new Node(1, "host1", 1234)));
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith("cluster1", 2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", 30000);
        brokerToControllerRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler testControllerRequestCompletionHandler = new TestUtils.TestControllerRequestCompletionHandler(TestUtils$TestControllerRequestCompletionHandler$.MODULE$.$lessinit$greater$default$1());
        brokerToControllerRequestThread.enqueue(new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), testControllerRequestCompletionHandler));
        brokerToControllerRequestThread.doWork();
        mockTime.sleep(30000);
        mockClient.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof MetadataRequest) && ((MetadataRequest) abstractRequest).allowAutoTopicCreation();
        }, metadataUpdateWith);
        brokerToControllerRequestThread.doWork();
        Assertions.assertTrue(testControllerRequestCompletionHandler.timedOut().get());
    }

    @Test
    public void testUnsupportedVersionHandling() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Mockito.when(controllerNodeProvider.get()).thenReturn(new Some(new Node(2, "host", 1234)));
        final AtomicReference atomicReference = new AtomicReference();
        final BrokerToControllerRequestThreadTest brokerToControllerRequestThreadTest = null;
        BrokerToControllerQueueItem brokerToControllerQueueItem = new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), new ControllerRequestCompletionHandler(brokerToControllerRequestThreadTest, atomicReference) { // from class: kafka.server.BrokerToControllerRequestThreadTest$$anon$1
            private final AtomicReference callbackResponse$1;

            public void onTimeout() {
                Assertions.fail("Unexpected timeout exception");
            }

            public void onComplete(ClientResponse clientResponse) {
                this.callbackResponse$1.set(clientResponse);
            }

            {
                this.callbackResponse$1 = atomicReference;
            }
        });
        mockClient.prepareUnsupportedVersionResponse(abstractRequest -> {
            ApiKeys apiKey = abstractRequest.apiKey();
            ApiKeys apiKeys = ApiKeys.METADATA;
            return apiKey == null ? apiKeys == null : apiKey.equals(apiKeys);
        });
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        brokerToControllerRequestThread.started_$eq(true);
        brokerToControllerRequestThread.enqueue(brokerToControllerQueueItem);
        int pollUntil$default$3 = pollUntil$default$3();
        int i = 0;
        do {
            brokerToControllerRequestThread.doWork();
            i++;
            if ($anonfun$testUnsupportedVersionHandling$2(atomicReference)) {
                break;
            }
        } while (i < pollUntil$default$3);
        if (!$anonfun$testUnsupportedVersionHandling$2(atomicReference)) {
            Assertions.fail(new StringBuilder(47).append("Condition failed to be met after polling ").append(i).append(" times").toString());
        }
        Assertions.assertNotNull(((ClientResponse) atomicReference.get()).versionMismatch());
    }

    @Test
    public void testAuthenticationExceptionHandling() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class);
        Node node = new Node(2, "host", 1234);
        Mockito.when(controllerNodeProvider.get()).thenReturn(new Some(node));
        final AtomicReference atomicReference = new AtomicReference();
        final BrokerToControllerRequestThreadTest brokerToControllerRequestThreadTest = null;
        BrokerToControllerQueueItem brokerToControllerQueueItem = new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), new ControllerRequestCompletionHandler(brokerToControllerRequestThreadTest, atomicReference) { // from class: kafka.server.BrokerToControllerRequestThreadTest$$anon$2
            private final AtomicReference callbackResponse$2;

            public void onTimeout() {
                Assertions.fail("Unexpected timeout exception");
            }

            public void onComplete(ClientResponse clientResponse) {
                this.callbackResponse$2.set(clientResponse);
            }

            {
                this.callbackResponse$2 = atomicReference;
            }
        });
        mockClient.createPendingAuthenticationError(node, 50L);
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, kafkaConfig, mockTime, "", Long.MAX_VALUE);
        brokerToControllerRequestThread.started_$eq(true);
        brokerToControllerRequestThread.enqueue(brokerToControllerQueueItem);
        int pollUntil$default$3 = pollUntil$default$3();
        int i = 0;
        do {
            brokerToControllerRequestThread.doWork();
            i++;
            if ($anonfun$testAuthenticationExceptionHandling$1(atomicReference)) {
                break;
            }
        } while (i < pollUntil$default$3);
        if (!$anonfun$testAuthenticationExceptionHandling$1(atomicReference)) {
            Assertions.fail(new StringBuilder(47).append("Condition failed to be met after polling ").append(i).append(" times").toString());
        }
        Assertions.assertNotNull(((ClientResponse) atomicReference.get()).authenticationException());
    }

    @Test
    public void testThreadNotStarted() {
        MockTime mockTime = new MockTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(new MockClient(mockTime, (Metadata) Mockito.mock(Metadata.class)), new ManualMetadataUpdater(), (ControllerNodeProvider) Mockito.mock(ControllerNodeProvider.class), kafkaConfig, mockTime, "", Long.MAX_VALUE);
        BrokerToControllerQueueItem brokerToControllerQueueItem = new BrokerToControllerQueueItem(mockTime.milliseconds(), new MetadataRequest.Builder(new MetadataRequestData()), new TestUtils.TestControllerRequestCompletionHandler(None$.MODULE$));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            brokerToControllerRequestThread.enqueue(brokerToControllerQueueItem);
        });
        Assertions.assertEquals(0, brokerToControllerRequestThread.queueSize());
    }

    private void pollUntil(BrokerToControllerRequestThread brokerToControllerRequestThread, Function0<Object> function0, int i) {
        int i2 = 0;
        do {
            brokerToControllerRequestThread.doWork();
            i2++;
            if (function0.apply$mcZ$sp()) {
                break;
            }
        } while (i2 < i);
        if (function0.apply$mcZ$sp()) {
            return;
        }
        Assertions.fail(new StringBuilder(47).append("Condition failed to be met after polling ").append(i2).append(" times").toString());
    }

    private int pollUntil$default$3() {
        return 10;
    }

    public static final /* synthetic */ boolean $anonfun$testUnsupportedVersionHandling$2(AtomicReference atomicReference) {
        return atomicReference.get() != null;
    }

    public static final /* synthetic */ boolean $anonfun$testAuthenticationExceptionHandling$1(AtomicReference atomicReference) {
        return atomicReference.get() != null;
    }
}
