package kafka.server;

import java.util.Properties;
import junit.framework.Assert;
import kafka.admin.AdminUtils$;
import kafka.api.LeaderAndIsr;
import kafka.api.LeaderAndIsrRequest;
import kafka.api.LeaderAndIsrResponse;
import kafka.api.RequestOrResponse;
import kafka.common.ErrorMapping$;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerContext;
import kafka.controller.LeaderIsrAndControllerEpoch;
import kafka.utils.TestUtils$;
import kafka.utils.ZkUtils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.scalatest.junit.JUnit3Suite;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: LeaderElectionTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-a\u0001B\u0001\u0003\u0001\u001d\u0011!\u0003T3bI\u0016\u0014X\t\\3di&|g\u000eV3ti*\u00111\u0001B\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001a\u0005\u0003\u0001\u0011IA\u0002CA\u0005\u0011\u001b\u0005Q!BA\u0006\r\u0003\u0015QWO\\5u\u0015\tia\"A\u0005tG\u0006d\u0017\r^3ti*\tq\"A\u0002pe\u001eL!!\u0005\u0006\u0003\u0017)+f.\u001b;4'VLG/\u001a\t\u0003'Yi\u0011\u0001\u0006\u0006\u0003+\u0011\t!A_6\n\u0005]!\"\u0001\u0006.p_.+W\r]3s)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000f\u0005\u0002\u001a95\t!DC\u0001\u001c\u0003\u0015\u00198-\u00197b\u0013\ti\"DA\u0006TG\u0006d\u0017m\u00142kK\u000e$\b\"B\u0010\u0001\t\u0003\u0001\u0013A\u0002\u001fj]&$h\bF\u0001\"!\t\u0011\u0003!D\u0001\u0003\u0011\u001d!\u0003A1A\u0005\u0002\u0015\n\u0011B\u0019:pW\u0016\u0014\u0018\nZ\u0019\u0016\u0003\u0019\u0002\"!G\u0014\n\u0005!R\"aA%oi\"1!\u0006\u0001Q\u0001\n\u0019\n!B\u0019:pW\u0016\u0014\u0018\nZ\u0019!\u0011\u001da\u0003A1A\u0005\u0002\u0015\n\u0011B\u0019:pW\u0016\u0014\u0018\n\u001a\u001a\t\r9\u0002\u0001\u0015!\u0003'\u0003)\u0011'o\\6fe&#'\u0007\t\u0005\ba\u0001\u0011\r\u0011\"\u0001&\u0003\u0015\u0001xN\u001d;2\u0011\u0019\u0011\u0004\u0001)A\u0005M\u00051\u0001o\u001c:uc\u0001Bq\u0001\u000e\u0001C\u0002\u0013\u0005Q%A\u0003q_J$(\u0007\u0003\u00047\u0001\u0001\u0006IAJ\u0001\u0007a>\u0014HO\r\u0011\t\u000fa\u0002!\u0019!C\u0001s\u0005a1m\u001c8gS\u001e\u0004&o\u001c9tcU\t!\b\u0005\u0002<\u00016\tAH\u0003\u0002>}\u0005!Q\u000f^5m\u0015\u0005y\u0014\u0001\u00026bm\u0006L!!\u0011\u001f\u0003\u0015A\u0013x\u000e]3si&,7\u000f\u0003\u0004D\u0001\u0001\u0006IAO\u0001\u000eG>tg-[4Qe>\u00048/\r\u0011\t\u000f\u0015\u0003!\u0019!C\u0001s\u0005a1m\u001c8gS\u001e\u0004&o\u001c9te!1q\t\u0001Q\u0001\ni\nQbY8oM&<\u0007K]8qgJ\u0002\u0003bB%\u0001\u0001\u0004%\tAS\u0001\bg\u0016\u0014h/\u001a:t+\u0005Y\u0005c\u0001'U/:\u0011QJ\u0015\b\u0003\u001dFk\u0011a\u0014\u0006\u0003!\u001a\ta\u0001\u0010:p_Rt\u0014\"A\u000e\n\u0005MS\u0012a\u00029bG.\fw-Z\u0005\u0003+Z\u00131aU3r\u0015\t\u0019&\u0004\u0005\u0002#1&\u0011\u0011L\u0001\u0002\f\u0017\u000647.Y*feZ,'\u000fC\u0004\\\u0001\u0001\u0007I\u0011\u0001/\u0002\u0017M,'O^3sg~#S-\u001d\u000b\u0003;\u0002\u0004\"!\u00070\n\u0005}S\"\u0001B+oSRDq!\u0019.\u0002\u0002\u0003\u00071*A\u0002yIEBaa\u0019\u0001!B\u0013Y\u0015\u0001C:feZ,'o\u001d\u0011\t\u000f\u0015\u0004\u0001\u0019!C\u0001M\u0006a2\u000f^1mK\u000e{g\u000e\u001e:pY2,'/\u00129pG\"$U\r^3di\u0016$W#A4\u0011\u0005eA\u0017BA5\u001b\u0005\u001d\u0011un\u001c7fC:Dqa\u001b\u0001A\u0002\u0013\u0005A.\u0001\u0011ti\u0006dWmQ8oiJ|G\u000e\\3s\u000bB|7\r\u001b#fi\u0016\u001cG/\u001a3`I\u0015\fHCA/n\u0011\u001d\t'.!AA\u0002\u001dDaa\u001c\u0001!B\u00139\u0017!H:uC2,7i\u001c8ue>dG.\u001a:Fa>\u001c\u0007\u000eR3uK\u000e$X\r\u001a\u0011\t\u000bE\u0004A\u0011\t:\u0002\u000bM,G/\u00169\u0015\u0003uCQ\u0001\u001e\u0001\u0005BI\f\u0001\u0002^3be\u0012{wO\u001c\u0005\u0006m\u0002!\ta^\u0001\u001bi\u0016\u001cH\u000fT3bI\u0016\u0014X\t\\3di&|g.\u00118e\u000bB|7\r[\u000b\u0002;\")\u0011\u0010\u0001C\u0001e\u0006QC/Z:u\u0019\u0016\fG-\u001a:FY\u0016\u001cG/[8o/&$\bn\u0015;bY\u0016\u001cuN\u001c;s_2dWM]#q_\u000eD\u0007\"B>\u0001\t\u0013a\u0018\u0001H:uC2,7i\u001c8ue>dG.\u001a:Fa>\u001c\u0007nQ1mY\n\f7m\u001b\u000b\u0003;vDQA >A\u0002}\f\u0001B]3ta>t7/\u001a\t\u0005\u0003\u0003\t9!\u0004\u0002\u0002\u0004)\u0019\u0011Q\u0001\u0003\u0002\u0007\u0005\u0004\u0018.\u0003\u0003\u0002\n\u0005\r!!\u0005*fcV,7\u000f^(s%\u0016\u001c\bo\u001c8tK\u0002")
/* loaded from: input_file:kafka/server/LeaderElectionTest.class */
public class LeaderElectionTest extends JUnit3Suite implements ZooKeeperTestHarness {
    private final int brokerId1;
    private final int brokerId2;
    private final int port1;
    private final int port2;
    private final Properties configProps1;
    private final Properties configProps2;
    private Seq<KafkaServer> servers;
    private boolean staleControllerEpochDetected;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    public int brokerId1() {
        return this.brokerId1;
    }

    public int brokerId2() {
        return this.brokerId2;
    }

    public int port1() {
        return this.port1;
    }

    public int port2() {
        return this.port2;
    }

    public Properties configProps1() {
        return this.configProps1;
    }

    public Properties configProps2() {
        return this.configProps2;
    }

    public Seq<KafkaServer> servers() {
        return this.servers;
    }

    public void servers_$eq(Seq<KafkaServer> seq) {
        this.servers = seq;
    }

    public boolean staleControllerEpochDetected() {
        return this.staleControllerEpochDetected;
    }

    public void staleControllerEpochDetected_$eq(boolean z) {
        this.staleControllerEpochDetected = z;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
        servers_$eq((Seq) servers().$plus$plus(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{TestUtils$.MODULE$.createServer(new KafkaConfig(configProps1()), TestUtils$.MODULE$.createServer$default$2()), TestUtils$.MODULE$.createServer(new KafkaConfig(configProps2()), TestUtils$.MODULE$.createServer$default$2())})), Seq$.MODULE$.canBuildFrom()));
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void tearDown() {
        servers().map(new LeaderElectionTest$$anonfun$tearDown$1(this), Seq$.MODULE$.canBuildFrom());
        servers().map(new LeaderElectionTest$$anonfun$tearDown$2(this), Seq$.MODULE$.canBuildFrom());
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    public void testLeaderElectionAndEpoch() {
        AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient(), "new-topic", Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(0)).$minus$greater(Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))})), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        None$ waitUntilLeaderIsElectedOrChanged = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        int epochForPartition = ZkUtils$.MODULE$.getEpochForPartition(zkClient(), "new-topic", 0);
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$5(this, epochForPartition));
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$6(this, waitUntilLeaderIsElectedOrChanged));
        Assert.assertTrue("Leader should get elected", waitUntilLeaderIsElectedOrChanged.isDefined());
        Assert.assertTrue("Leader could be broker 0 or broker 1", BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged.getOrElse(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$1(this))) == 0 || BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged.getOrElse(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$2(this))) == 1);
        Assert.assertEquals("First epoch value should be 0", 0, epochForPartition);
        ((KafkaServer) servers().last()).shutdown();
        None$ waitUntilLeaderIsElectedOrChanged2 = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 1500L, BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged.get()) == 0 ? None$.MODULE$ : waitUntilLeaderIsElectedOrChanged);
        int epochForPartition2 = ZkUtils$.MODULE$.getEpochForPartition(zkClient(), "new-topic", 0);
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$7(this, waitUntilLeaderIsElectedOrChanged));
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$8(this, epochForPartition2));
        Assert.assertEquals("Leader must move to broker 0", 0, BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged2.getOrElse(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$3(this))));
        if (BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged.get()) == BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged2.get())) {
            Assert.assertEquals(new StringBuilder().append("Second epoch value should be ").append(BoxesRunTime.boxToInteger(epochForPartition)).append(BoxesRunTime.boxToInteger(1)).toString(), epochForPartition + 1, epochForPartition2);
        } else {
            Assert.assertEquals(Predef$.MODULE$.augmentString("Second epoch value should be %d").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(epochForPartition + 1)})), epochForPartition + 1, epochForPartition2);
        }
        ((KafkaServer) servers().last()).startup();
        ((KafkaServer) servers().head()).shutdown();
        Thread.sleep(zookeeper().tickTime());
        Option<Object> waitUntilLeaderIsElectedOrChanged3 = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 1500L, BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged2.get()) == 1 ? None$.MODULE$ : waitUntilLeaderIsElectedOrChanged2);
        int epochForPartition3 = ZkUtils$.MODULE$.getEpochForPartition(zkClient(), "new-topic", 0);
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$9(this, epochForPartition3));
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$10(this, waitUntilLeaderIsElectedOrChanged3));
        Assert.assertEquals("Leader must return to 1", 1, BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged3.getOrElse(new LeaderElectionTest$$anonfun$testLeaderElectionAndEpoch$4(this))));
        if (BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged2.get()) == BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged3.get())) {
            Assert.assertEquals(new StringBuilder().append("Second epoch value should be ").append(BoxesRunTime.boxToInteger(epochForPartition2)).toString(), epochForPartition2, epochForPartition3);
        } else {
            Assert.assertEquals(Predef$.MODULE$.augmentString("Second epoch value should be %d").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(epochForPartition2 + 1)})), epochForPartition2 + 1, epochForPartition3);
        }
    }

    public void testLeaderElectionWithStaleControllerEpoch() {
        AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient(), "new-topic", Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(0)).$minus$greater(Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))})), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        Option<Object> waitUntilLeaderIsElectedOrChanged = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        int epochForPartition = ZkUtils$.MODULE$.getEpochForPartition(zkClient(), "new-topic", 0);
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionWithStaleControllerEpoch$4(this, epochForPartition));
        TestUtils$.MODULE$.debug(new LeaderElectionTest$$anonfun$testLeaderElectionWithStaleControllerEpoch$5(this, waitUntilLeaderIsElectedOrChanged));
        Assert.assertTrue("Leader should get elected", waitUntilLeaderIsElectedOrChanged.isDefined());
        Assert.assertTrue("Leader could be broker 0 or broker 1", BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged.getOrElse(new LeaderElectionTest$$anonfun$testLeaderElectionWithStaleControllerEpoch$1(this))) == 0 || BoxesRunTime.unboxToInt(waitUntilLeaderIsElectedOrChanged.getOrElse(new LeaderElectionTest$$anonfun$testLeaderElectionWithStaleControllerEpoch$2(this))) == 1);
        Assert.assertEquals("First epoch value should be 0", 0, epochForPartition);
        KafkaConfig kafkaConfig = new KafkaConfig(TestUtils$.MODULE$.createBrokerConfig(2, TestUtils$.MODULE$.choosePort()));
        Seq seq = (Seq) servers().map(new LeaderElectionTest$$anonfun$1(this), Seq$.MODULE$.canBuildFrom());
        ControllerContext controllerContext = new ControllerContext(zkClient(), 6000);
        controllerContext.liveBrokers_$eq(seq.toSet());
        ControllerChannelManager controllerChannelManager = new ControllerChannelManager(controllerContext, kafkaConfig);
        controllerChannelManager.startup();
        HashMap hashMap = new HashMap();
        hashMap.put(new Tuple2("new-topic", BoxesRunTime.boxToInteger(0)), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2(), List$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId1(), brokerId2()}))), 2));
        controllerChannelManager.sendRequest(brokerId2(), new LeaderAndIsrRequest(hashMap.mapValues(new LeaderElectionTest$$anonfun$2(this)).toMap(Predef$.MODULE$.conforms()), seq.toSet(), 2, 0, 0, ""), new LeaderElectionTest$$anonfun$testLeaderElectionWithStaleControllerEpoch$6(this));
        TestUtils$.MODULE$.waitUntilTrue(new LeaderElectionTest$$anonfun$testLeaderElectionWithStaleControllerEpoch$3(this), 1000L);
        Assert.assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected());
        controllerChannelManager.shutdown();
    }

    public final void kafka$server$LeaderElectionTest$$staleControllerEpochCallback(RequestOrResponse requestOrResponse) {
        staleControllerEpochDetected_$eq(BoxesRunTime.equals(BoxesRunTime.boxToShort(ErrorMapping$.MODULE$.StaleControllerEpochCode()), BoxesRunTime.boxToShort(((LeaderAndIsrResponse) requestOrResponse).errorCode())));
    }

    public LeaderElectionTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        this.brokerId1 = 0;
        this.brokerId2 = 1;
        this.port1 = TestUtils$.MODULE$.choosePort();
        this.port2 = TestUtils$.MODULE$.choosePort();
        this.configProps1 = TestUtils$.MODULE$.createBrokerConfig(brokerId1(), port1());
        this.configProps2 = TestUtils$.MODULE$.createBrokerConfig(brokerId2(), port2());
        this.servers = Seq$.MODULE$.empty();
        this.staleControllerEpochDetected = false;
    }
}
