package co.cask.cdap.common.guice;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.Tasks;
import com.google.common.collect.ImmutableMultimap;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.zookeeper.DefaultZKClientService;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/common/guice/KafkaClientModuleTest.class */
public class KafkaClientModuleTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private InMemoryZKServer zkServer;
    private EmbeddedKafkaServer kafkaServer;
    private String kafkaZKConnect;

    @Before
    public void beforeTest() throws Exception {
        this.zkServer = InMemoryZKServer.builder().setDataDir(TEMP_FOLDER.newFolder()).build();
        this.zkServer.startAndWait();
        String str = CConfiguration.create().get("kafka.zookeeper.namespace");
        this.kafkaZKConnect = this.zkServer.getConnectionStr();
        if (str != null) {
            DefaultZKClientService defaultZKClientService = new DefaultZKClientService(this.zkServer.getConnectionStr(), 2000, (Watcher) null, ImmutableMultimap.of());
            defaultZKClientService.startAndWait();
            defaultZKClientService.create("/" + str, (byte[]) null, CreateMode.PERSISTENT);
            defaultZKClientService.stopAndWait();
            this.kafkaZKConnect += "/" + str;
        }
        this.kafkaServer = createKafkaServer(this.kafkaZKConnect, TEMP_FOLDER.newFolder());
        this.kafkaServer.startAndWait();
    }

    @After
    public void afterTest() {
        this.kafkaServer.stopAndWait();
        this.zkServer.stopAndWait();
    }

    @Test
    public void testWithSharedZKClient() throws Exception {
        CConfiguration create = CConfiguration.create();
        create.set("zookeeper.quorum", this.zkServer.getConnectionStr());
        Injector createInjector = Guice.createInjector(new Module[]{new ConfigModule(create), new ZKClientModule(), new KafkaClientModule()});
        ZKClientService zKClientService = (ZKClientService) createInjector.getInstance(ZKClientService.class);
        zKClientService.startAndWait();
        int zKConnections = getZKConnections();
        KafkaClientService kafkaClientService = (KafkaClientService) createInjector.getInstance(KafkaClientService.class);
        final BrokerService brokerService = (BrokerService) createInjector.getInstance(BrokerService.class);
        kafkaClientService.startAndWait();
        brokerService.startAndWait();
        Assert.assertTrue(zKClientService.isRunning());
        Assert.assertEquals(zKConnections, getZKConnections());
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.common.guice.KafkaClientModuleTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(brokerService.getBrokers().iterator().hasNext());
            }
        }, 5L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        kafkaClientService.stopAndWait();
        brokerService.stopAndWait();
        Assert.assertTrue(zKClientService.isRunning());
        Assert.assertEquals(zKConnections, getZKConnections());
        zKClientService.stopAndWait();
    }

    @Test
    public void testWithDedicatedZKClient() throws Exception {
        CConfiguration create = CConfiguration.create();
        create.set("zookeeper.quorum", this.zkServer.getConnectionStr());
        create.set("kafka.zookeeper.quorum", this.kafkaZKConnect);
        Injector createInjector = Guice.createInjector(new Module[]{new ConfigModule(create), new ZKClientModule(), new KafkaClientModule()});
        ZKClientService zKClientService = (ZKClientService) createInjector.getInstance(ZKClientService.class);
        zKClientService.startAndWait();
        int zKConnections = getZKConnections();
        KafkaClientService kafkaClientService = (KafkaClientService) createInjector.getInstance(KafkaClientService.class);
        final BrokerService brokerService = (BrokerService) createInjector.getInstance(BrokerService.class);
        kafkaClientService.startAndWait();
        Assert.assertEquals(zKConnections + 1, getZKConnections());
        brokerService.startAndWait();
        Assert.assertEquals(zKConnections + 1, getZKConnections());
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.common.guice.KafkaClientModuleTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(brokerService.getBrokers().iterator().hasNext());
            }
        }, 5L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(zKClientService.isRunning());
        brokerService.stopAndWait();
        Assert.assertEquals(zKConnections + 1, getZKConnections());
        kafkaClientService.stopAndWait();
        Assert.assertEquals(zKConnections, getZKConnections());
        Assert.assertTrue(zKClientService.isRunning());
        zKClientService.stopAndWait();
    }

    private int getZKConnections() throws IOException {
        InetSocketAddress localAddress = this.zkServer.getLocalAddress();
        Socket socket = new Socket(localAddress.getAddress(), localAddress.getPort());
        Throwable th = null;
        try {
            try {
                socket.getOutputStream().write("cons".getBytes(StandardCharsets.ISO_8859_1));
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
                int i = 0;
                for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                    i++;
                }
                int i2 = i;
                if (socket != null) {
                    if (0 != 0) {
                        try {
                            socket.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        socket.close();
                    }
                }
                return i2;
            } finally {
            }
        } catch (Throwable th3) {
            if (socket != null) {
                if (th != null) {
                    try {
                        socket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    socket.close();
                }
            }
            throw th3;
        }
    }

    private EmbeddedKafkaServer createKafkaServer(String str, File file) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("broker.id", "1");
        properties.setProperty("num.network.threads", "2");
        properties.setProperty("num.io.threads", "2");
        properties.setProperty("socket.send.buffer.bytes", "1048576");
        properties.setProperty("socket.receive.buffer.bytes", "1048576");
        properties.setProperty("socket.request.max.bytes", "104857600");
        properties.setProperty("log.dir", file.getAbsolutePath());
        properties.setProperty("num.partitions", "1");
        properties.setProperty("log.flush.interval.messages", "10000");
        properties.setProperty("log.flush.interval.ms", "1000");
        properties.setProperty("log.retention.hours", "1");
        properties.setProperty("log.segment.bytes", "536870912");
        properties.setProperty("log.cleanup.interval.mins", "1");
        properties.setProperty("zookeeper.connect", str);
        properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
        return new EmbeddedKafkaServer(properties);
    }
}
