package kafka.server;

import java.util.List;
import java.util.Properties;
import kafka.api.LeaderAndIsr$;
import kafka.cluster.Broker;
import kafka.controller.ControlMetadataBatch;
import kafka.controller.ControlMetadataBatchResult;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.LeaderAndIsrBatch;
import kafka.controller.StateChangeLogger;
import kafka.utils.TestUtils$;
import kafka.zk.KafkaZkClient;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversable;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: LeaderElectionTest.scala */
@ScalaSignature(bytes = "\u0006\u0001e3A!\u0004\b\u0001'!)!\u0004\u0001C\u00017!9a\u0004\u0001b\u0001\n\u0003y\u0002B\u0002\u0014\u0001A\u0003%\u0001\u0005C\u0004(\u0001\t\u0007I\u0011A\u0010\t\r!\u0002\u0001\u0015!\u0003!\u0011\u001dI\u0003\u00011A\u0005\u0002)BqA\u000f\u0001A\u0002\u0013\u00051\b\u0003\u0004B\u0001\u0001\u0006Ka\u000b\u0005\u0006\u0005\u0002!\te\u0011\u0005\u0006\u001b\u0002!\te\u0011\u0005\u0006%\u0002!\ta\u0011\u0005\u0006/\u0002!\ta\u0011\u0002\u0013\u0019\u0016\fG-\u001a:FY\u0016\u001cG/[8o)\u0016\u001cHO\u0003\u0002\u0010!\u000511/\u001a:wKJT\u0011!E\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018!\u0005\u0011!p[\u0005\u00033Y\u0011ACW8p\u0017\u0016,\u0007/\u001a:UKN$\b*\u0019:oKN\u001c\u0018A\u0002\u001fj]&$h\bF\u0001\u001d!\ti\u0002!D\u0001\u000f\u0003%\u0011'o\\6fe&#\u0017'F\u0001!!\t\tC%D\u0001#\u0015\u0005\u0019\u0013!B:dC2\f\u0017BA\u0013#\u0005\rIe\u000e^\u0001\u000bEJ|7.\u001a:JIF\u0002\u0013!\u00032s_.,'/\u001333\u0003)\u0011'o\\6fe&#'\u0007I\u0001\bg\u0016\u0014h/\u001a:t+\u0005Y\u0003c\u0001\u00175o9\u0011QF\r\b\u0003]Ej\u0011a\f\u0006\u0003aI\ta\u0001\u0010:p_Rt\u0014\"A\u0012\n\u0005M\u0012\u0013a\u00029bG.\fw-Z\u0005\u0003kY\u00121aU3r\u0015\t\u0019$\u0005\u0005\u0002\u001eq%\u0011\u0011H\u0004\u0002\f\u0017\u000647.Y*feZ,'/A\u0006tKJ4XM]:`I\u0015\fHC\u0001\u001f@!\t\tS(\u0003\u0002?E\t!QK\\5u\u0011\u001d\u0001u!!AA\u0002-\n1\u0001\u001f\u00132\u0003!\u0019XM\u001d<feN\u0004\u0013!B:fiV\u0003H#\u0001\u001f)\u0005%)\u0005C\u0001$L\u001b\u00059%B\u0001%J\u0003\u0015QWO\\5u\u0015\u0005Q\u0015aA8sO&\u0011Aj\u0012\u0002\u0007\u0005\u00164wN]3\u0002\u0011Q,\u0017M\u001d#po:D#AC(\u0011\u0005\u0019\u0003\u0016BA)H\u0005\u0015\te\r^3s\u0003i!Xm\u001d;MK\u0006$WM]#mK\u000e$\u0018n\u001c8B]\u0012,\u0005o\\2iQ\tYA\u000b\u0005\u0002G+&\u0011ak\u0012\u0002\u0005)\u0016\u001cH/\u0001\u0016uKN$H*Z1eKJ,E.Z2uS>tw+\u001b;i'R\fG.Z\"p]R\u0014x\u000e\u001c7fe\u0016\u0003xn\u00195)\u00051!\u0006")
/* loaded from: input_file:kafka/server/LeaderElectionTest.class */
public class LeaderElectionTest extends ZooKeeperTestHarness {
    private final int brokerId1 = 0;
    private final int brokerId2 = 1;
    private Seq<KafkaServer> servers = Nil$.MODULE$;

    public int brokerId1() {
        return this.brokerId1;
    }

    public int brokerId2() {
        return this.brokerId2;
    }

    public Seq<KafkaServer> servers() {
        return this.servers;
    }

    public void servers_$eq(Seq<KafkaServer> seq) {
        this.servers = seq;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(brokerId1(), zkConnect(), false, TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        Properties createBrokerConfig2 = TestUtils$.MODULE$.createBrokerConfig(brokerId2(), zkConnect(), false, TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put("unclean.leader.election.enable", "true");
        createBrokerConfig2.put("unclean.leader.election.enable", "true");
        servers_$eq((Seq) servers().$plus$plus(new $colon.colon(TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), TestUtils$.MODULE$.createServer$default$2()), new $colon.colon(TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig2), TestUtils$.MODULE$.createServer$default$2()), Nil$.MODULE$)), Seq$.MODULE$.canBuildFrom()));
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        TestUtils$.MODULE$.shutdownServers(servers());
        super.tearDown();
    }

    @Test
    public void testLeaderElectionAndEpoch() {
        TestUtils$.MODULE$.waitUntilBrokerMetadataIsPropagated(servers(), TestUtils$.MODULE$.waitUntilBrokerMetadataIsPropagated$default$2());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        KafkaZkClient zkClient = zkClient();
        Map$ Map = Predef$.MODULE$.Map();
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[1];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0));
        GenTraversable apply = Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1}));
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, apply);
        int unboxToInt = BoxesRunTime.unboxToInt(testUtils$.createTopic(zkClient, "new-topic", (Map) Map.apply(predef$.wrapRefArray(tuple2Arr)), servers()).apply(BoxesRunTime.boxToInteger(0)));
        int unboxToInt2 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        Assert.assertTrue("Leader should be broker 0", unboxToInt == 0);
        Assert.assertEquals("First epoch value should be 0", 0L, unboxToInt2);
        ((KafkaServer) servers().head()).shutdown();
        int waitUntilLeaderIsElectedOrChanged = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), new Some<>(BoxesRunTime.boxToInteger(unboxToInt)), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        int unboxToInt3 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        Assert.assertEquals("Leader must move to broker 1", 1L, waitUntilLeaderIsElectedOrChanged);
        if (Predef$.MODULE$ == null) {
            throw null;
        }
        Assert.assertEquals(new StringOps("Second epoch value should be %d").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt2 + 2)})), unboxToInt2 + 2, unboxToInt3);
        ((KafkaServer) servers().head()).startup();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testLeaderElectionAndEpoch$1(this, "new-topic", 0)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                throw Assertions$.MODULE$.fail($anonfun$testLeaderElectionAndEpoch$3(), new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 877));
            }
            RichLong$ richLong$ = RichLong$.MODULE$;
            if (Predef$.MODULE$ == null) {
                throw null;
            }
            Thread.sleep(richLong$.min$extension(waitUntilTrue$default$3, waitUntilTrue$default$4));
        }
        ((KafkaServer) servers().last()).shutdown();
        Thread.sleep(zookeeper().tickTime());
        int waitUntilLeaderIsElectedOrChanged2 = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), new Some<>(BoxesRunTime.boxToInteger(waitUntilLeaderIsElectedOrChanged)), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        int unboxToInt4 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        Assert.assertEquals("Leader must return to 0", 0L, waitUntilLeaderIsElectedOrChanged2);
        if (Predef$.MODULE$ == null) {
            throw null;
        }
        Assert.assertEquals(new StringOps("Second epoch value should be %d").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt3 + 2)})), unboxToInt3 + 2, unboxToInt4);
    }

    @Test
    public void testLeaderElectionWithStaleControllerEpoch() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        KafkaZkClient zkClient = zkClient();
        Map$ Map = Predef$.MODULE$.Map();
        Predef$ predef$ = Predef$.MODULE$;
        Tuple2[] tuple2Arr = new Tuple2[1];
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0));
        GenTraversable apply = Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1}));
        if (predef$ArrowAssoc$ == null) {
            throw null;
        }
        tuple2Arr[0] = new Tuple2(ArrowAssoc, apply);
        int unboxToInt = BoxesRunTime.unboxToInt(testUtils$.createTopic(zkClient, "new-topic", (Map) Map.apply(predef$.wrapRefArray(tuple2Arr)), servers()).apply(BoxesRunTime.boxToInteger(0)));
        int unboxToInt2 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        debug(() -> {
            return new StringBuilder(14).append("leader Epoch: ").append(unboxToInt2).toString();
        });
        debug(() -> {
            if (Predef$.MODULE$ == null) {
                throw null;
            }
            return new StringOps("Leader is elected to be: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt)}));
        });
        Assert.assertTrue("Leader could be broker 0 or broker 1", unboxToInt == 0 || unboxToInt == 1);
        Assert.assertEquals("First epoch value should be 0", 0L, unboxToInt2);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(2, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        scala.collection.immutable.Map map = ((TraversableOnce) servers().map(kafkaServer -> {
            return new Tuple2(new Broker(kafkaServer.config().brokerId(), "localhost", TestUtils$.MODULE$.boundPort(kafkaServer, TestUtils$.MODULE$.boundPort$default$2()), forSecurityProtocol, securityProtocol), BoxesRunTime.boxToLong(kafkaServer.kafkaController().brokerEpoch()));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Iterable iterable = (Iterable) map.keys().map(broker -> {
            return broker.node(forSecurityProtocol);
        }, Iterable$.MODULE$.canBuildFrom());
        ControllerContext controllerContext = new ControllerContext();
        controllerContext.setLiveBrokers(map);
        Metrics metrics = new Metrics();
        ControllerChannelManager controllerChannelManager = new ControllerChannelManager(controllerContext, fromProps, Time.SYSTEM, metrics, new StateChangeLogger(2, true, None$.MODULE$), ControllerChannelManager$.MODULE$.$lessinit$greater$default$6());
        controllerChannelManager.startup();
        try {
            LeaderAndIsrBatch addLiveLeaders = new LeaderAndIsrBatch(brokerId2()).setControllerId(2).setControllerEpoch(0).setBrokerEpoch(((KafkaServer) servers().apply(brokerId2())).kafkaController().brokerEpoch()).addPartitionState(new TopicPartition("new-topic", 0), new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setTopicName("new-topic").setPartitionIndex(0).setControllerEpoch(2).setLeader(brokerId2()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch()).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId1(), brokerId2()})).map(obj -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj));
            }, Seq$.MODULE$.canBuildFrom())).asJava()).setZkVersion(LeaderAndIsr$.MODULE$.initialZKVersion()).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})).map(obj2 -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj2));
            }, Seq$.MODULE$.canBuildFrom())).asJava()).setIsNew(false)).addLiveLeaders(iterable.toSet());
            BooleanRef create = BooleanRef.create(false);
            controllerChannelManager.sendControlMetadataBatch(brokerId2(), addLiveLeaders, (controlMetadataBatch, controlMetadataBatchResult) -> {
                $anonfun$testLeaderElectionWithStaleControllerEpoch$7(create, controlMetadataBatch, controlMetadataBatchResult);
                return BoxedUnit.UNIT;
            });
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$2 == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!create.elem) {
                if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                    throw Assertions$.MODULE$.fail($anonfun$testLeaderElectionWithStaleControllerEpoch$9(), new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 877));
                }
                RichLong$ richLong$ = RichLong$.MODULE$;
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                Thread.sleep(richLong$.min$extension(waitUntilTrue$default$3, waitUntilTrue$default$4));
            }
            Assert.assertTrue("Stale controller epoch not detected by the broker", create.elem);
        } finally {
            controllerChannelManager.shutdown();
            metrics.close();
        }
    }

    public static final /* synthetic */ boolean $anonfun$testLeaderElectionAndEpoch$2(UpdateMetadataRequestData.UpdateMetadataPartitionState updateMetadataPartitionState) {
        return updateMetadataPartitionState.isr().size() == 2;
    }

    public static final /* synthetic */ boolean $anonfun$testLeaderElectionAndEpoch$1(LeaderElectionTest leaderElectionTest, String str, int i) {
        Option partitionInfo = ((KafkaServer) leaderElectionTest.servers().last()).metadataCache().getPartitionInfo(str, i);
        if (partitionInfo == null) {
            throw null;
        }
        return !partitionInfo.isEmpty() && $anonfun$testLeaderElectionAndEpoch$2((UpdateMetadataRequestData.UpdateMetadataPartitionState) partitionInfo.get());
    }

    public static final /* synthetic */ String $anonfun$testLeaderElectionAndEpoch$3() {
        return "Inconsistent metadata after second broker startup";
    }

    public static final /* synthetic */ void $anonfun$testLeaderElectionWithStaleControllerEpoch$7(BooleanRef booleanRef, ControlMetadataBatch controlMetadataBatch, ControlMetadataBatchResult controlMetadataBatchResult) {
        Errors error = controlMetadataBatchResult.error();
        Errors errors = Errors.STALE_CONTROLLER_EPOCH;
        booleanRef.elem = error != null ? error.equals(errors) : errors == null;
    }

    public static final /* synthetic */ String $anonfun$testLeaderElectionWithStaleControllerEpoch$9() {
        return "Controller epoch should be stale";
    }
}
