package kafka.server;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: BrokerToControllerRequestThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001!2A!\u0002\u0004\u0001\u0017!)!\u0003\u0001C\u0001'!)a\u0003\u0001C\u0001/!)A\u0005\u0001C\u0001/!)a\u0005\u0001C\u0001/\t\u0019#I]8lKJ$vnQ8oiJ|G\u000e\\3s%\u0016\fX/Z:u)\"\u0014X-\u00193UKN$(BA\u0004\t\u0003\u0019\u0019XM\u001d<fe*\t\u0011\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001a\u0001CA\u0007\u0011\u001b\u0005q!\"A\b\u0002\u000bM\u001c\u0017\r\\1\n\u0005Eq!AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002)A\u0011Q\u0003A\u0007\u0002\r\u0005\u0001B/Z:u%\u0016\fX/Z:ugN+g\u000e\u001e\u000b\u00021A\u0011Q\"G\u0005\u000359\u0011A!\u00168ji\"\u0012!\u0001\b\t\u0003;\tj\u0011A\b\u0006\u0003?\u0001\nQA[;oSRT\u0011!I\u0001\u0004_J<\u0017BA\u0012\u001f\u0005\u0011!Vm\u001d;\u0002+Q,7\u000f^\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\\4fI\"\u00121\u0001H\u0001\u0012i\u0016\u001cHOT8u\u0007>tGO]8mY\u0016\u0014\bF\u0001\u0003\u001d\u0001")
/* loaded from: input_file:kafka/server/BrokerToControllerRequestThreadTest.class */
public class BrokerToControllerRequestThreadTest {
    @Test
    public void testRequestsSent() {
        SystemTime systemTime = new SystemTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(systemTime, (Metadata) Mockito.mock(Metadata.class));
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        MetadataCache metadataCache = (MetadataCache) Mockito.mock(MetadataCache.class);
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
        Broker broker = new Broker(2, new $colon.colon(new EndPoint("host", 1234, forSecurityProtocol, SecurityProtocol.PLAINTEXT), Nil$.MODULE$), None$.MODULE$, Features.emptySupportedFeatures());
        Mockito.when(metadataCache.getControllerId()).thenReturn(new Some(BoxesRunTime.boxToInteger(2)));
        Mockito.when(metadataCache.getAliveBrokers()).thenReturn(new $colon.colon(broker, Nil$.MODULE$));
        Mockito.when(metadataCache.getAliveBroker(2)).thenReturn(new Some(broker));
        MetadataResponse metadataUpdateWith = TestUtils.metadataUpdateWith(2, Collections.singletonMap("a", new Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), linkedBlockingDeque, metadataCache, kafkaConfig, forSecurityProtocol, systemTime, "");
        mockClient.prepareResponse(metadataUpdateWith);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        linkedBlockingDeque.put(new BrokerToControllerQueueItem(new MetadataRequest.Builder(new MetadataRequestData()), clientResponse -> {
            Assert.assertEquals(metadataUpdateWith, clientResponse.responseBody());
            countDownLatch.countDown();
        }));
        brokerToControllerRequestThread.doWork();
        brokerToControllerRequestThread.doWork();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testControllerChanged() {
        SystemTime systemTime = new SystemTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(systemTime, (Metadata) Mockito.mock(Metadata.class));
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        MetadataCache metadataCache = (MetadataCache) Mockito.mock(MetadataCache.class);
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
        Broker broker = new Broker(1, new $colon.colon(new EndPoint("host1", 1234, forSecurityProtocol, SecurityProtocol.PLAINTEXT), Nil$.MODULE$), None$.MODULE$, Features.emptySupportedFeatures());
        Node node = broker.node(forSecurityProtocol);
        Broker broker2 = new Broker(2, new $colon.colon(new EndPoint("host2", 1234, forSecurityProtocol, SecurityProtocol.PLAINTEXT), Nil$.MODULE$), None$.MODULE$, Features.emptySupportedFeatures());
        Mockito.when(metadataCache.getControllerId()).thenReturn(new Some(BoxesRunTime.boxToInteger(1)), new Option[]{new Some(BoxesRunTime.boxToInteger(2))});
        Mockito.when(metadataCache.getAliveBroker(1)).thenReturn(new Some(broker));
        Mockito.when(metadataCache.getAliveBroker(2)).thenReturn(new Some(broker2));
        Mockito.when(metadataCache.getAliveBrokers()).thenReturn(new $colon.colon(broker, new $colon.colon(broker2, Nil$.MODULE$)));
        MetadataResponse metadataUpdateWith = TestUtils.metadataUpdateWith(3, Collections.singletonMap("a", new Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), linkedBlockingDeque, metadataCache, kafkaConfig, forSecurityProtocol, systemTime, "");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BrokerToControllerQueueItem brokerToControllerQueueItem = new BrokerToControllerQueueItem(new MetadataRequest.Builder(new MetadataRequestData()), clientResponse -> {
            Assert.assertEquals(metadataUpdateWith, clientResponse.responseBody());
            countDownLatch.countDown();
        });
        linkedBlockingDeque.put(brokerToControllerQueueItem);
        mockClient.prepareResponse(metadataUpdateWith);
        brokerToControllerRequestThread.doWork();
        Assert.assertFalse(linkedBlockingDeque.isEmpty());
        Assert.assertEquals(1L, linkedBlockingDeque.size());
        Assert.assertEquals(brokerToControllerQueueItem, linkedBlockingDeque.peek());
        mockClient.setUnreachable(node, systemTime.milliseconds() + 5000);
        brokerToControllerRequestThread.doWork();
        Assert.assertFalse(linkedBlockingDeque.isEmpty());
        Assert.assertEquals(1L, linkedBlockingDeque.size());
        brokerToControllerRequestThread.doWork();
        brokerToControllerRequestThread.doWork();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testNotController() {
        SystemTime systemTime = new SystemTime();
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        MockClient mockClient = new MockClient(systemTime, (Metadata) Mockito.mock(Metadata.class));
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        MetadataCache metadataCache = (MetadataCache) Mockito.mock(MetadataCache.class);
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
        Broker broker = new Broker(1, new $colon.colon(new EndPoint("host1", 1234, forSecurityProtocol, SecurityProtocol.PLAINTEXT), Nil$.MODULE$), None$.MODULE$, Features.emptySupportedFeatures());
        Broker broker2 = new Broker(2, new $colon.colon(new EndPoint("host2", 1234, forSecurityProtocol, SecurityProtocol.PLAINTEXT), Nil$.MODULE$), None$.MODULE$, Features.emptySupportedFeatures());
        Mockito.when(metadataCache.getControllerId()).thenReturn(new Some(BoxesRunTime.boxToInteger(1)), new Option[]{new Some(BoxesRunTime.boxToInteger(2))});
        Mockito.when(metadataCache.getAliveBrokers()).thenReturn(new $colon.colon(broker, new $colon.colon(broker2, Nil$.MODULE$)));
        Mockito.when(metadataCache.getAliveBroker(1)).thenReturn(new Some(broker));
        Mockito.when(metadataCache.getAliveBroker(2)).thenReturn(new Some(broker2));
        MetadataResponse metadataUpdateWith = TestUtils.metadataUpdateWith("cluster1", 2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", new Integer(2)));
        MetadataResponse metadataUpdateWith2 = TestUtils.metadataUpdateWith(3, Collections.singletonMap("a", new Integer(2)));
        BrokerToControllerRequestThread brokerToControllerRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), linkedBlockingDeque, metadataCache, kafkaConfig, forSecurityProtocol, systemTime, "");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        linkedBlockingDeque.put(new BrokerToControllerQueueItem(new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), clientResponse -> {
            Assert.assertEquals(metadataUpdateWith2, clientResponse.responseBody());
            countDownLatch.countDown();
        }));
        brokerToControllerRequestThread.doWork();
        mockClient.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof MetadataRequest) && ((MetadataRequest) abstractRequest).allowAutoTopicCreation();
        }, metadataUpdateWith);
        brokerToControllerRequestThread.doWork();
        brokerToControllerRequestThread.doWork();
        mockClient.prepareResponse(metadataUpdateWith2);
        brokerToControllerRequestThread.doWork();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
    }
}
