package kafka.server;

import java.util.Properties;
import junit.framework.Assert;
import kafka.admin.CreateTopicCommand$;
import kafka.api.FetchRequestBuilder;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.utils.IntEncoder;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import org.scalatest.junit.JUnit3Suite;
import scala.Predef$;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.reflect.ScalaSignature;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes = "\u0006\u0001%4A!\u0001\u0002\u0001\u000f\t\u00112+\u001a:wKJ\u001c\u0006.\u001e;e_^tG+Z:u\u0015\t\u0019A!\u0001\u0004tKJ4XM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\t%A\u0011\u0011\u0002E\u0007\u0002\u0015)\u00111\u0002D\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003\u001b9\t\u0011b]2bY\u0006$Xm\u001d;\u000b\u0003=\t1a\u001c:h\u0013\t\t\"BA\u0006K+:LGoM*vSR,\u0007CA\n\u0017\u001b\u0005!\"BA\u000b\u0005\u0003\tQ8.\u0003\u0002\u0018)\t!\"l\\8LK\u0016\u0004XM\u001d+fgRD\u0015M\u001d8fgNDQ!\u0007\u0001\u0005\u0002i\ta\u0001P5oSRtD#A\u000e\u0011\u0005q\u0001Q\"\u0001\u0002\t\u000fy\u0001!\u0019!C\u0001?\u0005!\u0001o\u001c:u+\u0005\u0001\u0003CA\u0011%\u001b\u0005\u0011#\"A\u0012\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0012#aA%oi\"1q\u0005\u0001Q\u0001\n\u0001\nQ\u0001]8si\u0002Bq!\u000b\u0001C\u0002\u0013\u0005!&A\u0003qe>\u00048/F\u0001,!\ta\u0013'D\u0001.\u0015\tqs&\u0001\u0003vi&d'\"\u0001\u0019\u0002\t)\fg/Y\u0005\u0003e5\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u0019!\u0004\u0001)A\u0005W\u00051\u0001O]8qg\u0002BqA\u000e\u0001C\u0002\u0013\u0005q'\u0001\u0004d_:4\u0017nZ\u000b\u0002qA\u0011A$O\u0005\u0003u\t\u00111bS1gW\u0006\u001cuN\u001c4jO\"1A\b\u0001Q\u0001\na\nqaY8oM&<\u0007\u0005C\u0004?\u0001\t\u0007I\u0011A \u0002\t!|7\u000f^\u000b\u0002\u0001B\u0011\u0011\tR\u0007\u0002\u0005*\u00111iL\u0001\u0005Y\u0006tw-\u0003\u0002F\u0005\n11\u000b\u001e:j]\u001eDaa\u0012\u0001!\u0002\u0013\u0001\u0015!\u00025pgR\u0004\u0003bB%\u0001\u0005\u0004%\taP\u0001\u0006i>\u0004\u0018n\u0019\u0005\u0007\u0017\u0002\u0001\u000b\u0011\u0002!\u0002\rQ|\u0007/[2!\u0011\u001di\u0005A1A\u0005\u00029\u000bQa]3oiF*\u0012a\u0014\t\u0004!V\u0003U\"A)\u000b\u0005I\u001b\u0016!C5n[V$\u0018M\u00197f\u0015\t!&%\u0001\u0006d_2dWm\u0019;j_:L!AV)\u0003\t1K7\u000f\u001e\u0005\u00071\u0002\u0001\u000b\u0011B(\u0002\rM,g\u000e^\u0019!\u0011\u001dQ\u0006A1A\u0005\u00029\u000bQa]3oiJBa\u0001\u0018\u0001!\u0002\u0013y\u0015AB:f]R\u0014\u0004\u0005C\u0003_\u0001\u0011\u0005q,A\tuKN$8\t\\3b]NCW\u000f\u001e3po:$\u0012\u0001\u0019\t\u0003C\u0005L!A\u0019\u0012\u0003\tUs\u0017\u000e\u001e\u0015\u0003;\u0012\u0004\"!Z4\u000e\u0003\u0019T!a\u0003\b\n\u0005!4'\u0001\u0002+fgR\u0004")
/* loaded from: input_file:kafka/server/ServerShutdownTest.class */
public class ServerShutdownTest extends JUnit3Suite implements ZooKeeperTestHarness {
    private final int port;
    private final Properties props;
    private final KafkaConfig config;
    private final String host;
    private final String topic;
    private final List<String> sent1;
    private final List<String> sent2;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    public int port() {
        return this.port;
    }

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    @Test
    public void testCleanShutdown() {
        ByteBufferMessageSet byteBufferMessageSet;
        ObjectRef objectRef = new ObjectRef(new KafkaServer(config(), KafkaServer$.MODULE$.$lessinit$greater$default$2()));
        ((KafkaServer) objectRef.elem).startup();
        Properties producerConfig = TestUtils$.MODULE$.getProducerConfig(TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config()}))), TestUtils$.MODULE$.getProducerConfig$default$2());
        producerConfig.put("key.serializer.class", IntEncoder.class.getName().toString());
        Producer producer = new Producer(new ProducerConfig(producerConfig));
        CreateTopicCommand$.MODULE$.createTopic(zkClient(), topic(), 1, 1, "0");
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{(KafkaServer) objectRef.elem})), topic(), 0, 1000L);
        producer.send((Seq) sent1().map(new ServerShutdownTest$$anonfun$testCleanShutdown$1(this), List$.MODULE$.canBuildFrom()));
        ((KafkaServer) objectRef.elem).shutdown();
        config().logDirs().foreach(new ServerShutdownTest$$anonfun$testCleanShutdown$2(this, objectRef));
        producer.close();
        objectRef.elem = new KafkaServer(config(), KafkaServer$.MODULE$.$lessinit$greater$default$2());
        ((KafkaServer) objectRef.elem).startup();
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{(KafkaServer) objectRef.elem})), topic(), 0, 1000L);
        Producer producer2 = new Producer(new ProducerConfig(producerConfig));
        SimpleConsumer simpleConsumer = new SimpleConsumer(host(), port(), 1000000, 65536, "");
        ByteBufferMessageSet byteBufferMessageSet2 = null;
        while (true) {
            byteBufferMessageSet = byteBufferMessageSet2;
            if (byteBufferMessageSet != null && byteBufferMessageSet.validBytes() != 0) {
                break;
            } else {
                byteBufferMessageSet2 = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic(), 0, 0L, 10000).maxWait(0).build()).messageSet(topic(), 0);
            }
        }
        Assert.assertEquals(sent1(), byteBufferMessageSet.map(new ServerShutdownTest$$anonfun$testCleanShutdown$3(this), Iterable$.MODULE$.canBuildFrom()));
        long nextOffset = ((MessageAndOffset) byteBufferMessageSet.last()).nextOffset();
        producer2.send((Seq) sent2().map(new ServerShutdownTest$$anonfun$testCleanShutdown$4(this), List$.MODULE$.canBuildFrom()));
        ByteBufferMessageSet byteBufferMessageSet3 = null;
        while (true) {
            ByteBufferMessageSet byteBufferMessageSet4 = byteBufferMessageSet3;
            if (byteBufferMessageSet4 != null && byteBufferMessageSet4.validBytes() != 0) {
                Assert.assertEquals(sent2(), byteBufferMessageSet4.map(new ServerShutdownTest$$anonfun$testCleanShutdown$5(this), Iterable$.MODULE$.canBuildFrom()));
                simpleConsumer.close();
                producer2.close();
                ((KafkaServer) objectRef.elem).shutdown();
                Utils$.MODULE$.rm(((KafkaServer) objectRef.elem).config().logDirs());
                return;
            }
            byteBufferMessageSet3 = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic(), 0, nextOffset, 10000).build()).messageSet(topic(), 0);
        }
    }

    public ServerShutdownTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        this.port = TestUtils$.MODULE$.choosePort();
        this.props = TestUtils$.MODULE$.createBrokerConfig(0, port());
        this.config = new KafkaConfig(props());
        this.host = "localhost";
        this.topic = "test";
        this.sent1 = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"hello", "there"}));
        this.sent2 = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"more", "messages"}));
    }
}
