/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.http.server.integration;

import io.confluent.http.server.KafkaHttpServerConfig;
import io.confluent.http.server.KafkaHttpServerImpl;
import io.confluent.http.server.integration.fixtures.FooBarApplication;
import io.confluent.kafka.http.server.KafkaHttpServer;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class KafkaHttpServerImplIntegrationTest {
    @Test
    public void serverEnabled_httpServer_runsServerAndApplication_onHttpServerListener() throws Exception {
        String uri = "http://localhost:" + KafkaHttpServerImplIntegrationTest.findUnusedPort();
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", uri);
        cluster.startBrokers(1, brokerProperties);
        ((KafkaHttpServer)((KafkaServer)cluster.brokers().get(0)).httpServer().get()).awaitStarted();
        String response = (String)ClientBuilder.newClient().target(uri).path("foobar").request().accept(new String[]{"text/plain"}).get().readEntity(String.class);
        Assert.assertEquals((Object)"foobar", (Object)response);
        cluster.shutdown();
    }

    @Test
    public void serverEnabled_getDynamicPort() throws Exception {
        HashMap<String, String> conf = new HashMap<String, String>();
        conf.put("listeners", "http://localhost:0");
        KafkaHttpServerImpl server = new KafkaHttpServerImpl(Collections.singletonList(new FooBarApplication()), new KafkaHttpServerConfig(conf));
        server.start();
        server.awaitStarted();
        Assert.assertNotEquals((long)0L, (long)server.getPrimaryPort());
        server.stop();
        server.awaitStopped();
    }

    @Test
    public void serverEnabled_httpServer_shutdownServerAndApplication() throws Exception {
        String uri = "http://localhost:" + KafkaHttpServerImplIntegrationTest.findUnusedPort();
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", uri);
        cluster.startBrokers(1, brokerProperties);
        Assert.assertFalse((boolean)FooBarApplication.wasShutdown());
        cluster.shutdown();
        Assert.assertTrue((boolean)FooBarApplication.wasShutdown());
    }

    @Test
    public void serverEnabled_metadataServer_runsServerAndApplication_onMetadataServerListener() throws Exception {
        String uri = "http://localhost:" + KafkaHttpServerImplIntegrationTest.findUnusedPort();
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.metadata.server.listeners", uri);
        cluster.startBrokers(1, brokerProperties);
        KafkaHttpServerImpl server = (KafkaHttpServerImpl)((KafkaServer)cluster.brokers().get(0)).httpServer().get();
        server.awaitStarted();
        String response = (String)ClientBuilder.newClient().target(uri).path("foobar").request().accept(new String[]{"text/plain"}).get().readEntity(String.class);
        Assert.assertEquals((Object)"foobar", (Object)response);
        cluster.shutdown();
    }

    @Test
    public void serverEnabled_httpServerAndMetadataServer_runsServerAndApplication_onMetadataServerListener() throws Exception {
        String uri = "http://localhost:" + KafkaHttpServerImplIntegrationTest.findUnusedPort();
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", uri);
        brokerProperties.put("confluent.metadata.server.listeners", uri);
        cluster.startBrokers(1, brokerProperties);
        ((KafkaHttpServer)((KafkaServer)cluster.brokers().get(0)).httpServer().get()).awaitStarted();
        String response = (String)ClientBuilder.newClient().target(uri).path("foobar").request().accept(new String[]{"text/plain"}).get().readEntity(String.class);
        Assert.assertEquals((Object)"foobar", (Object)response);
        cluster.shutdown();
    }

    @Test
    public void serverDisabled_doesNotRunServer() throws Exception {
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", "");
        brokerProperties.put("confluent.metadata.server.listeners", "");
        cluster.startBrokers(1, brokerProperties);
        Assert.assertFalse((boolean)((KafkaServer)cluster.brokers().get(0)).httpServer().isDefined());
        cluster.shutdown();
    }

    @Test
    public void serverEnabled_applicationDisabled_doesNotRunServer() throws Exception {
        String uri = "http://localhost:" + KafkaHttpServerImplIntegrationTest.findUnusedPort();
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", uri);
        brokerProperties.put("foo.bar.enabled", "false");
        cluster.startBrokers(1, brokerProperties);
        Assert.assertFalse((boolean)((KafkaServer)cluster.brokers().get(0)).httpServer().isDefined());
        cluster.shutdown();
    }

    @Test
    public void serverEnabled_applicationException_stopBroker() throws IOException {
        String uri = "http://localhost:" + KafkaHttpServerImplIntegrationTest.findUnusedPort();
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", uri);
        brokerProperties.put("exceptional.enabled", "true");
        try {
            cluster.startBrokers(1, brokerProperties);
        }
        catch (Throwable t) {
            assert (t instanceof ConfigException);
            Assert.assertEquals((Object)"Failed on resource configuration", (Object)t.getMessage());
        }
        cluster.shutdown();
    }

    @Test
    public void serverEnabled_exposes_port() throws Exception {
        int port = KafkaHttpServerImplIntegrationTest.findUnusedPort();
        String uri = "http://localhost:" + port;
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", uri);
        brokerProperties.put("foo.bar.enabled", "true");
        cluster.startBrokers(1, brokerProperties);
        KafkaHttpServerImpl server = (KafkaHttpServerImpl)((KafkaServer)cluster.brokers().get(0)).httpServer().get();
        Assert.assertEquals((long)port, (long)server.getPrimaryPort());
        cluster.shutdown();
    }

    @Test
    public void serverEnabled_ReconfigurableApp() throws Exception {
        HashMap test_ctx = new HashMap();
        int port = KafkaHttpServerImplIntegrationTest.findUnusedPort();
        String uri = "http://localhost:" + port;
        EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
        cluster.startZooKeeper();
        Properties brokerProperties = new Properties();
        brokerProperties.put("confluent.http.server.listeners", uri);
        brokerProperties.put("foo.bar.enabled", "true");
        brokerProperties.put("test.context", test_ctx);
        cluster.startBrokers(1, brokerProperties);
        KafkaHttpServerImpl server = (KafkaHttpServerImpl)((KafkaServer)cluster.brokers().get(0)).httpServer().get();
        server.awaitStarted();
        Properties adminProps = new Properties();
        adminProps.put("bootstrap.servers", cluster.bootstrapServers());
        AdminClient admin = AdminClient.create((Properties)adminProps);
        HashMap<ConfigResource, List<AlterConfigOp>> config = new HashMap<ConfigResource, List<AlterConfigOp>>();
        ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "");
        AlterConfigOp op = new AlterConfigOp(new ConfigEntry("config-1", "value1"), AlterConfigOp.OpType.SET);
        config.put(cr, Collections.singletonList(op));
        admin.incrementalAlterConfigs(config, new AlterConfigsOptions()).all().get();
        WebTarget httpClient = ClientBuilder.newClient().target(uri);
        boolean response = (Boolean)httpClient.path("reconfigure/configs").request().get().readEntity(Boolean.TYPE);
        Assert.assertTrue((boolean)response);
        response = (Boolean)httpClient.path("reconfigure/validate").request().get().readEntity(Boolean.TYPE);
        Assert.assertTrue((boolean)response);
        response = (Boolean)httpClient.path("reconfigure/reconfigure").request().get().readEntity(Boolean.TYPE);
        Assert.assertTrue((boolean)response);
        cluster.shutdown();
    }

    private static int findUnusedPort() throws IOException {
        try (ServerSocket socket = new ServerSocket(0);){
            int n = socket.getLocalPort();
            return n;
        }
    }
}

