package kafka.server;

import java.util.Properties;
import junit.framework.Assert;
import kafka.admin.AdminUtils$;
import kafka.api.FetchRequestBuilder;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.utils.IntEncoder;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import org.scalatest.junit.JUnit3Suite;
import scala.Array$;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.reflect.ClassManifest$;
import scala.reflect.ScalaSignature;
import scala.runtime.ObjectRef;

/* compiled from: ServerShutdownTest.scala */
@ScalaSignature(bytes = "\u0006\u0001E4\u0001\"\u0001\u0002\u0005\u0002\u0003\u0005\ta\u0002\u0002\u0013'\u0016\u0014h/\u001a:TQV$Hm\\<o)\u0016\u001cHO\u0003\u0002\u0004\t\u000511/\u001a:wKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\u0011\u0001\u0001B\u0005\r\u0011\u0005%\u0001R\"\u0001\u0006\u000b\u0005-a\u0011!\u00026v]&$(BA\u0007\u000f\u0003%\u00198-\u00197bi\u0016\u001cHOC\u0001\u0010\u0003\ry'oZ\u0005\u0003#)\u00111BS+oSR\u001c4+^5uKB\u00111CF\u0007\u0002))\u0011Q\u0003B\u0001\u0003u.L!a\u0006\u000b\u0003)i{wnS3fa\u0016\u0014H+Z:u\u0011\u0006\u0014h.Z:t!\tIB$D\u0001\u001b\u0015\u0005Y\u0012!B:dC2\f\u0017BA\u000f\u001b\u0005-\u00196-\u00197b\u001f\nTWm\u0019;\t\u000b}\u0001A\u0011\u0001\u0011\u0002\rqJg.\u001b;?)\u0005\t\u0003C\u0001\u0012\u0001\u001b\u0005\u0011\u0001b\u0002\u0013\u0001\u0005\u0004%\t!J\u0001\u0005a>\u0014H/F\u0001'!\tIr%\u0003\u0002)5\t\u0019\u0011J\u001c;\t\r)\u0002\u0001\u0015!\u0003'\u0003\u0015\u0001xN\u001d;!\u0011\u001da\u0003A1A\u0005\u00025\nQ\u0001\u001d:paN,\u0012A\f\t\u0003_Qj\u0011\u0001\r\u0006\u0003cI\nA!\u001e;jY*\t1'\u0001\u0003kCZ\f\u0017BA\u001b1\u0005)\u0001&o\u001c9feRLWm\u001d\u0005\u0007o\u0001\u0001\u000b\u0011\u0002\u0018\u0002\rA\u0014x\u000e]:!\u0011\u001dI\u0004A1A\u0005\u0002i\naaY8oM&<W#A\u001e\u0011\u0005\tb\u0014BA\u001f\u0003\u0005-Y\u0015MZ6b\u0007>tg-[4\t\r}\u0002\u0001\u0015!\u0003<\u0003\u001d\u0019wN\u001c4jO\u0002Bq!\u0011\u0001C\u0002\u0013\u0005!)\u0001\u0003i_N$X#A\"\u0011\u0005\u0011;U\"A#\u000b\u0005\u0019\u0013\u0014\u0001\u00027b]\u001eL!\u0001S#\u0003\rM#(/\u001b8h\u0011\u0019Q\u0005\u0001)A\u0005\u0007\u0006)\u0001n\\:uA!9A\n\u0001b\u0001\n\u0003\u0011\u0015!\u0002;pa&\u001c\u0007B\u0002(\u0001A\u0003%1)\u0001\u0004u_BL7\r\t\u0005\b!\u0002\u0011\r\u0011\"\u0001R\u0003\u0015\u0019XM\u001c;2+\u0005\u0011\u0006cA*Y\u00076\tAK\u0003\u0002V-\u0006I\u0011.\\7vi\u0006\u0014G.\u001a\u0006\u0003/j\t!bY8mY\u0016\u001cG/[8o\u0013\tIFK\u0001\u0003MSN$\bBB.\u0001A\u0003%!+\u0001\u0004tK:$\u0018\u0007\t\u0005\b;\u0002\u0011\r\u0011\"\u0001R\u0003\u0015\u0019XM\u001c;3\u0011\u0019y\u0006\u0001)A\u0005%\u000611/\u001a8ue\u0001BQ!\u0019\u0001\u0005\u0002\t\f\u0011\u0003^3ti\u000ecW-\u00198TQV$Hm\\<o)\u0005\u0019\u0007CA\re\u0013\t)'D\u0001\u0003V]&$\bF\u00011h!\tA'.D\u0001j\u0015\tYa\"\u0003\u0002lS\n!A+Z:u\u0011\u0015i\u0007\u0001\"\u0001c\u0003\u001d\"Xm\u001d;DY\u0016\fgn\u00155vi\u0012|wO\\,ji\"$U\r\\3uKR{\u0007/[2F]\u0006\u0014G.\u001a3)\u00051<\u0007\"\u00029\u0001\t\u0003\u0011\u0017\u0001\b<fe&4\u0017PT8o\t\u0006,Wn\u001c8UQJ,\u0017\rZ:Ti\u0006$Xo\u001d")
/* loaded from: input_file:kafka/server/ServerShutdownTest.class */
public class ServerShutdownTest extends JUnit3Suite implements ZooKeeperTestHarness, ScalaObject {
    private final int port;
    private final Properties props;
    private final KafkaConfig config;
    private final String host;
    private final String topic;
    private final List<String> sent1;
    private final List<String> sent2;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    public int port() {
        return this.port;
    }

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    @Test
    public void testCleanShutdown() {
        ByteBufferMessageSet byteBufferMessageSet;
        ObjectRef objectRef = new ObjectRef(new KafkaServer(config(), KafkaServer$.MODULE$.init$default$2()));
        ((KafkaServer) objectRef.elem).startup();
        Properties producerConfig = TestUtils$.MODULE$.getProducerConfig(TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config()}))), TestUtils$.MODULE$.getProducerConfig$default$2());
        producerConfig.put("key.serializer.class", IntEncoder.class.getName().toString());
        Producer producer = new Producer(new ProducerConfig(producerConfig));
        AdminUtils$.MODULE$.createTopic(zkClient(), topic(), 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{(KafkaServer) objectRef.elem})), topic(), 0, 1000L);
        producer.send((Seq) sent1().map(new ServerShutdownTest$$anonfun$testCleanShutdown$1(this), List$.MODULE$.canBuildFrom()));
        ((KafkaServer) objectRef.elem).shutdown();
        config().logDirs().foreach(new ServerShutdownTest$$anonfun$testCleanShutdown$2(this, objectRef));
        producer.close();
        objectRef.elem = new KafkaServer(config(), KafkaServer$.MODULE$.init$default$2());
        ((KafkaServer) objectRef.elem).startup();
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{(KafkaServer) objectRef.elem})), topic(), 0, 1000L);
        Producer producer2 = new Producer(new ProducerConfig(producerConfig));
        SimpleConsumer simpleConsumer = new SimpleConsumer(host(), port(), 1000000, 65536, "");
        ByteBufferMessageSet byteBufferMessageSet2 = null;
        while (true) {
            byteBufferMessageSet = byteBufferMessageSet2;
            if (byteBufferMessageSet != null && byteBufferMessageSet.validBytes() != 0) {
                break;
            } else {
                byteBufferMessageSet2 = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic(), 0, 0L, 10000).maxWait(0).build()).messageSet(topic(), 0);
            }
        }
        Assert.assertEquals(sent1(), byteBufferMessageSet.map(new ServerShutdownTest$$anonfun$testCleanShutdown$3(this), Iterable$.MODULE$.canBuildFrom()));
        long nextOffset = ((MessageAndOffset) byteBufferMessageSet.last()).nextOffset();
        producer2.send((Seq) sent2().map(new ServerShutdownTest$$anonfun$testCleanShutdown$4(this), List$.MODULE$.canBuildFrom()));
        ByteBufferMessageSet byteBufferMessageSet3 = null;
        while (true) {
            ByteBufferMessageSet byteBufferMessageSet4 = byteBufferMessageSet3;
            if (byteBufferMessageSet4 != null && byteBufferMessageSet4.validBytes() != 0) {
                Assert.assertEquals(sent2(), byteBufferMessageSet4.map(new ServerShutdownTest$$anonfun$testCleanShutdown$5(this), Iterable$.MODULE$.canBuildFrom()));
                simpleConsumer.close();
                producer2.close();
                ((KafkaServer) objectRef.elem).shutdown();
                Utils$.MODULE$.rm(((KafkaServer) objectRef.elem).config().logDirs());
                verifyNonDaemonThreadsStatus();
                return;
            }
            byteBufferMessageSet3 = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic(), 0, nextOffset, 10000).build()).messageSet(topic(), 0);
        }
    }

    @Test
    public void testCleanShutdownWithDeleteTopicEnabled() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, port());
        createBrokerConfig.setProperty("delete.topic.enable", "true");
        KafkaServer kafkaServer = new KafkaServer(new KafkaConfig(createBrokerConfig), KafkaServer$.MODULE$.init$default$2());
        kafkaServer.startup();
        kafkaServer.shutdown();
        kafkaServer.awaitShutdown();
        Utils$.MODULE$.rm(kafkaServer.config().logDirs());
        verifyNonDaemonThreadsStatus();
    }

    public void verifyNonDaemonThreadsStatus() {
        Assert.assertEquals(0, Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(Thread.getAllStackTraces().keySet().toArray()).map(new ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$1(this), Array$.MODULE$.canBuildFrom(ClassManifest$.MODULE$.classType(Thread.class)))).count(new ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2(this)));
    }

    public ServerShutdownTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        this.port = TestUtils$.MODULE$.createBrokerConfig$default$2();
        this.props = TestUtils$.MODULE$.createBrokerConfig(0, port());
        this.config = new KafkaConfig(props());
        this.host = "localhost";
        this.topic = "test";
        this.sent1 = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"hello", "there"}));
        this.sent2 = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"more", "messages"}));
    }
}
