/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.topologies;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.pulsar.tests.integration.containers.BKContainer;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
import org.apache.pulsar.tests.integration.containers.PrestoWorkerContainer;
import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;

public class PulsarCluster {
    private static final Logger log = LoggerFactory.getLogger(PulsarCluster.class);
    public static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
    public static final String CLIENT_SCRIPT = "/pulsar/bin/pulsar-client";
    public static final String PULSAR_COMMAND_SCRIPT = "/pulsar/bin/pulsar";
    private final PulsarClusterSpec spec;
    private final String clusterName;
    private final Network network;
    private final ZKContainer zkContainer;
    private final CSContainer csContainer;
    private final Map<String, BKContainer> bookieContainers;
    private final Map<String, BrokerContainer> brokerContainers;
    private final Map<String, WorkerContainer> workerContainers;
    private final ProxyContainer proxyContainer;
    private PrestoWorkerContainer prestoWorkerContainer;
    private Map<String, PrestoWorkerContainer> sqlFollowWorkerContainers;
    private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
    private final boolean enablePrestoWorker;

    public static PulsarCluster forSpec(PulsarClusterSpec spec) {
        return new PulsarCluster(spec);
    }

    private PulsarCluster(PulsarClusterSpec spec) {
        this.spec = spec;
        this.clusterName = spec.clusterName();
        this.network = Network.newNetwork();
        this.enablePrestoWorker = spec.enablePrestoWorker();
        this.sqlFollowWorkerContainers = Maps.newTreeMap();
        this.prestoWorkerContainer = this.enablePrestoWorker ? this.buildPrestoWorkerContainer("presto-worker", true, null, null) : null;
        this.zkContainer = new ZKContainer(this.clusterName);
        this.zkContainer.withNetwork(this.network).withNetworkAliases(new String[]{"zookeeper"}).withEnv("clusterName", this.clusterName).withEnv("zkServers", "zookeeper").withEnv("configurationStore", "configuration-store:2184").withEnv("forceSync", "no").withEnv("pulsarNode", "pulsar-broker-0");
        this.csContainer = (CSContainer)((CSContainer)new CSContainer(this.clusterName).withNetwork(this.network)).withNetworkAliases(new String[]{"configuration-store"});
        this.bookieContainers = Maps.newTreeMap();
        this.brokerContainers = Maps.newTreeMap();
        this.workerContainers = Maps.newTreeMap();
        this.proxyContainer = (ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)new ProxyContainer(this.clusterName, "pulsar-proxy").withNetwork(this.network)).withNetworkAliases(new String[]{"pulsar-proxy"})).withEnv("zkServers", "zookeeper")).withEnv("zookeeperServers", "zookeeper")).withEnv("configurationStoreServers", "configuration-store:2184")).withEnv("clusterName", this.clusterName);
        if (spec.proxyEnvs != null) {
            spec.proxyEnvs.forEach((arg_0, arg_1) -> ((ProxyContainer)this.proxyContainer).withEnv(arg_0, arg_1));
        }
        if (spec.proxyMountFiles != null) {
            spec.proxyMountFiles.forEach((arg_0, arg_1) -> ((ProxyContainer)this.proxyContainer).withFileSystemBind(arg_0, arg_1));
        }
        this.bookieContainers.putAll(PulsarCluster.runNumContainers("bookie", spec.numBookies(), name -> (BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)new BKContainer(this.clusterName, (String)name).withNetwork(this.network)).withNetworkAliases(new String[]{name})).withEnv("zkServers", "zookeeper")).withEnv("useHostNameAsBookieID", "true")).withEnv("journalSyncData", "false")).withEnv("journalMaxGroupWaitMSec", "0")).withEnv("clusterName", this.clusterName)).withEnv("diskUsageThreshold", "0.99")).withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize)));
        this.brokerContainers.putAll(PulsarCluster.runNumContainers("broker", spec.numBrokers(), name -> {
            BrokerContainer brokerContainer = (BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)new BrokerContainer(this.clusterName, (String)name).withNetwork(this.network)).withNetworkAliases(new String[]{name})).withEnv("zkServers", "zookeeper")).withEnv("zookeeperServers", "zookeeper")).withEnv("configurationStoreServers", "configuration-store:2184")).withEnv("clusterName", this.clusterName)).withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")).withEnv("AWS_ACCESS_KEY_ID", "accesskey")).withEnv("AWS_SECRET_KEY", "secretkey")).withEnv("maxMessageSize", "" + spec.maxMessageSize);
            if (spec.queryLastMessage) {
                brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
                brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
            }
            if (spec.brokerEnvs != null) {
                brokerContainer.withEnv(spec.brokerEnvs);
            }
            if (spec.brokerMountFiles != null) {
                spec.brokerMountFiles.forEach((arg_0, arg_1) -> ((BrokerContainer)brokerContainer).withFileSystemBind(arg_0, arg_1));
            }
            return brokerContainer;
        }));
        spec.classPathVolumeMounts.forEach((key, value) -> {
            this.zkContainer.withClasspathResourceMapping((String)key, (String)value, BindMode.READ_WRITE);
            this.proxyContainer.withClasspathResourceMapping((String)key, (String)value, BindMode.READ_WRITE);
            this.bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
            this.brokerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
            this.workerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
        });
    }

    public String getPlainTextServiceUrl() {
        return this.proxyContainer.getPlainTextServiceUrl();
    }

    public String getHttpServiceUrl() {
        return this.proxyContainer.getHttpServiceUrl();
    }

    public String getAllBrokersHttpServiceUrl() {
        String multiUrl = "http://";
        Iterator<BrokerContainer> brokers = this.getBrokers().iterator();
        while (brokers.hasNext()) {
            BrokerContainer broker = brokers.next();
            multiUrl = multiUrl + broker.getContainerIpAddress() + ":" + broker.getMappedPort(8080);
            if (!brokers.hasNext()) continue;
            multiUrl = multiUrl + ",";
        }
        return multiUrl;
    }

    public String getZKConnString() {
        return this.zkContainer.getContainerIpAddress() + ":" + this.zkContainer.getMappedPort(2181);
    }

    public String getCSConnString() {
        return this.csContainer.getContainerIpAddress() + ":" + this.csContainer.getMappedPort(2184);
    }

    public Network getNetwork() {
        return this.network;
    }

    public Map<String, GenericContainer<?>> getExternalServices() {
        return this.externalServices;
    }

    public void start() throws Exception {
        this.zkContainer.start();
        log.info("Successfully started local zookeeper container.");
        this.csContainer.start();
        log.info("Successfully started configuration store container.");
        this.zkContainer.execCmd("bin/init-cluster.sh");
        log.info("Successfully initialized the cluster.");
        this.bookieContainers.values().forEach(PulsarContainer::start);
        log.info("Successfully started {} bookie containers.", (Object)this.bookieContainers.size());
        this.startAllBrokers();
        log.info("Successfully started {} broker containers.", (Object)this.brokerContainers.size());
        this.proxyContainer.start();
        log.info("Successfully started pulsar proxy.");
        log.info("Pulsar cluster {} is up running:", (Object)this.clusterName);
        log.info("\tBinary Service Url : {}", (Object)this.getPlainTextServiceUrl());
        log.info("\tHttp Service Url : {}", (Object)this.getHttpServiceUrl());
        if (this.enablePrestoWorker) {
            log.info("Starting Presto Worker");
            this.prestoWorkerContainer.start();
        }
        this.externalServices = this.spec.externalServices;
        if (null != this.externalServices) {
            this.externalServices.entrySet().parallelStream().forEach(service -> {
                GenericContainer serviceContainer = (GenericContainer)service.getValue();
                serviceContainer.withNetwork(this.network);
                serviceContainer.withNetworkAliases(new String[]{(String)service.getKey()});
                PulsarContainer.configureLeaveContainerRunning(serviceContainer);
                serviceContainer.start();
                log.info("Successfully start external service {}.", service.getKey());
            });
        }
    }

    public void startService(String networkAlias, GenericContainer<?> serviceContainer) {
        log.info("Starting external service {} ...", (Object)networkAlias);
        serviceContainer.withNetwork(this.network);
        serviceContainer.withNetworkAliases(new String[]{networkAlias});
        PulsarContainer.configureLeaveContainerRunning(serviceContainer);
        serviceContainer.start();
        log.info("Successfully start external service {}", (Object)networkAlias);
    }

    public static void stopService(String networkAlias, GenericContainer<?> serviceContainer) {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        log.info("Stopping external service {} ...", (Object)networkAlias);
        serviceContainer.stop();
        log.info("Successfully stop external service {}", (Object)networkAlias);
    }

    private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName, int numContainers, Function<String, T> containerCreator) {
        TreeMap containers = Maps.newTreeMap();
        for (int i = 0; i < numContainers; ++i) {
            String name = "pulsar-" + serviceName + "-" + i;
            PulsarContainer container = (PulsarContainer)((Object)containerCreator.apply(name));
            containers.put(name, container);
        }
        return containers;
    }

    public PrestoWorkerContainer getPrestoWorkerContainer() {
        return this.prestoWorkerContainer;
    }

    public synchronized void stop() {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        List<Object> containers = new ArrayList();
        containers.addAll(this.workerContainers.values());
        containers.addAll(this.brokerContainers.values());
        containers.addAll(this.bookieContainers.values());
        if (this.externalServices != null) {
            containers.addAll(this.externalServices.values());
        }
        if (null != this.proxyContainer) {
            containers.add((Object)this.proxyContainer);
        }
        if (null != this.csContainer) {
            containers.add((Object)this.csContainer);
        }
        if (null != this.zkContainer) {
            containers.add((Object)this.zkContainer);
        }
        if (null != this.prestoWorkerContainer) {
            containers.add((Object)this.prestoWorkerContainer);
        }
        containers = containers.parallelStream().filter(Objects::nonNull).collect(Collectors.toList());
        containers.parallelStream().forEach(GenericContainer::stop);
        try {
            this.network.close();
        }
        catch (Exception e) {
            log.info("Failed to shutdown network for pulsar cluster {}", (Object)this.clusterName, (Object)e);
        }
    }

    public void startPrestoWorker() {
        this.startPrestoWorker(null, null);
    }

    public void startPrestoWorker(String offloadDriver, String offloadProperties) {
        log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", (Object)offloadDriver, (Object)offloadProperties);
        if (null == this.prestoWorkerContainer) {
            this.prestoWorkerContainer = this.buildPrestoWorkerContainer("presto-worker", true, offloadDriver, offloadProperties);
        }
        this.prestoWorkerContainer.start();
        log.info("[{}] Presto coordinator start finished.", (Object)this.prestoWorkerContainer.getContainerName());
    }

    public void stopPrestoWorker() {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        if (this.sqlFollowWorkerContainers != null && this.sqlFollowWorkerContainers.size() > 0) {
            for (PrestoWorkerContainer followWorker : this.sqlFollowWorkerContainers.values()) {
                followWorker.stop();
                log.info("Stopped presto follow worker {}.", (Object)followWorker.getContainerName());
            }
            this.sqlFollowWorkerContainers.clear();
            log.info("Stopped all presto follow workers.");
        }
        if (null != this.prestoWorkerContainer) {
            this.prestoWorkerContainer.stop();
            log.info("Stopped presto coordinator.");
            this.prestoWorkerContainer = null;
        }
    }

    public void startPrestoFollowWorkers(int numSqlFollowWorkers, String offloadDriver, String offloadProperties) {
        log.info("start presto follow worker containers.");
        this.sqlFollowWorkerContainers.putAll(PulsarCluster.runNumContainers("sql-follow-worker", numSqlFollowWorkers, name -> {
            log.info("build presto follow worker with name {}", name);
            return this.buildPrestoWorkerContainer((String)name, false, offloadDriver, offloadProperties);
        }));
        this.sqlFollowWorkerContainers.values().parallelStream().forEach(PulsarContainer::start);
        log.info("Successfully started {} presto follow worker containers.", (Object)this.sqlFollowWorkerContainers.size());
    }

    private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolean isCoordinator, String offloadDriver, String offloadProperties) {
        String resourcePath = isCoordinator ? "presto-coordinator-config.properties" : "presto-follow-worker-config.properties";
        PrestoWorkerContainer container = (PrestoWorkerContainer)((PrestoWorkerContainer)((PrestoWorkerContainer)((PrestoWorkerContainer)((PrestoWorkerContainer)((PrestoWorkerContainer)((PrestoWorkerContainer)((PrestoWorkerContainer)((PrestoWorkerContainer)new PrestoWorkerContainer(this.clusterName, hostName).withNetwork(this.network)).withNetworkAliases(new String[]{hostName})).withEnv("clusterName", this.clusterName)).withEnv("zkServers", "zookeeper")).withEnv("zookeeperServers", "zookeeper:2181")).withEnv("pulsar.zookeeper-uri", "zookeeper:2181")).withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080")).withEnv("SQL_PREFIX_pulsar.max-message-size", "" + this.spec.maxMessageSize)).withClasspathResourceMapping(resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE);
        if (this.spec.queryLastMessage) {
            ((PrestoWorkerContainer)container.withEnv("pulsar.bookkeeper-use-v2-protocol", "false")).withEnv("pulsar.bookkeeper-explicit-interval", "10");
        }
        if (offloadDriver != null && offloadProperties != null) {
            log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}", (Object)offloadDriver, (Object)offloadProperties);
            container.withEnv("SQL_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver);
            container.withEnv("SQL_PREFIX_pulsar.offloader-properties", offloadProperties);
            container.withEnv("SQL_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders");
            container.withEnv("AWS_ACCESS_KEY_ID", "accesskey");
            container.withEnv("AWS_SECRET_KEY", "secretkey");
        }
        log.info("[{}] build presto worker container. isCoordinator: {}, resourcePath: {}", new Object[]{container.getContainerName(), isCoordinator, resourcePath});
        return container;
    }

    public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, int numFunctionWorkers) {
        switch (runtimeType) {
            case THREAD: {
                this.startFunctionWorkersWithThreadContainerFactory(suffix, numFunctionWorkers);
                break;
            }
            case PROCESS: {
                this.startFunctionWorkersWithProcessContainerFactory(suffix, numFunctionWorkers);
            }
        }
    }

    private void startFunctionWorkersWithProcessContainerFactory(String suffix, int numFunctionWorkers) {
        String serviceUrl = "pulsar://pulsar-broker-0:6650";
        String httpServiceUrl = "http://pulsar-broker-0:8080";
        this.workerContainers.putAll(PulsarCluster.runNumContainers("functions-worker-process-" + suffix, numFunctionWorkers, name -> (WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)new WorkerContainer(this.clusterName, (String)name).withNetwork(this.network)).withNetworkAliases(new String[]{name})).withEnv("PF_workerId", (String)name)).withEnv("PF_workerHostname", (String)name)).withEnv("PF_workerPort", "8080")).withEnv("PF_pulsarFunctionsCluster", this.clusterName)).withEnv("PF_pulsarServiceUrl", serviceUrl)).withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)).withEnv("clusterName", this.clusterName)).withEnv("zookeeperServers", "zookeeper")).withEnv("zkServers", "zookeeper")));
        this.startWorkers();
    }

    private void startFunctionWorkersWithThreadContainerFactory(String suffix, int numFunctionWorkers) {
        String serviceUrl = "pulsar://pulsar-broker-0:6650";
        String httpServiceUrl = "http://pulsar-broker-0:8080";
        this.workerContainers.putAll(PulsarCluster.runNumContainers("functions-worker-thread-" + suffix, numFunctionWorkers, name -> (WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)new WorkerContainer(this.clusterName, (String)name).withNetwork(this.network)).withNetworkAliases(new String[]{name})).withEnv("PF_workerId", (String)name)).withEnv("PF_workerHostname", (String)name)).withEnv("PF_workerPort", "8080")).withEnv("PF_pulsarFunctionsCluster", this.clusterName)).withEnv("PF_pulsarServiceUrl", serviceUrl)).withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)).withEnv("PF_functionRuntimeFactoryClassName", "org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory")).withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName", "pf-container-group")).withEnv("clusterName", this.clusterName)).withEnv("zookeeperServers", "zookeeper")).withEnv("zkServers", "zookeeper")));
        this.startWorkers();
    }

    public synchronized void startWorkers() {
        this.workerContainers.values().parallelStream().forEach(PulsarContainer::start);
        log.info("Successfully started {} worker containers.", (Object)this.workerContainers.size());
    }

    public synchronized void stopWorkers() {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        this.workerContainers.values().parallelStream().forEach(PulsarContainer::stop);
        this.workerContainers.clear();
    }

    public void startContainers(Map<String, GenericContainer<?>> containers) {
        containers.forEach((name, container) -> {
            PulsarContainer.configureLeaveContainerRunning(container);
            container.withNetwork(this.network).withNetworkAliases(new String[]{name}).start();
            log.info("Successfully start container {}.", name);
        });
    }

    public static void stopContainers(Map<String, GenericContainer<?>> containers) {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        containers.values().parallelStream().forEach(GenericContainer::stop);
        log.info("Successfully stop containers : {}", containers);
    }

    private static void logIgnoringStopDueToLeaveRunning() {
        log.warn("Ignoring stop due to PULSAR_CONTAINERS_LEAVE_RUNNING=true.");
    }

    public BrokerContainer getAnyBroker() {
        return this.getAnyContainer(this.brokerContainers, "pulsar-broker");
    }

    public synchronized WorkerContainer getAnyWorker() {
        return this.getAnyContainer(this.workerContainers, "pulsar-functions-worker");
    }

    public BrokerContainer getBroker(int index) {
        return this.getAnyContainer(this.brokerContainers, "pulsar-broker", index);
    }

    public synchronized WorkerContainer getWorker(int index) {
        return this.getAnyContainer(this.workerContainers, "pulsar-functions-worker", index);
    }

    private <T> T getAnyContainer(Map<String, T> containers, String serviceName) {
        ArrayList containerList = Lists.newArrayList();
        containerList.addAll(containers.values());
        Collections.shuffle(containerList);
        Preconditions.checkArgument((!containerList.isEmpty() ? 1 : 0) != 0, (Object)("No " + serviceName + " is alive"));
        return (T)containerList.get(0);
    }

    private <T> T getAnyContainer(Map<String, T> containers, String serviceName, int index) {
        Preconditions.checkArgument((!containers.isEmpty() ? 1 : 0) != 0, (Object)("No " + serviceName + " is alive"));
        Preconditions.checkArgument((index >= 0 && index < containers.size() ? 1 : 0) != 0, (Object)("Index : " + index + " is out range"));
        return containers.get(serviceName.toLowerCase() + "-" + index);
    }

    public Collection<BrokerContainer> getBrokers() {
        return this.brokerContainers.values();
    }

    public ProxyContainer getProxy() {
        return this.proxyContainer;
    }

    public Collection<BKContainer> getBookies() {
        return this.bookieContainers.values();
    }

    public ZKContainer getZooKeeper() {
        return this.zkContainer;
    }

    public ContainerExecResult runAdminCommandOnAnyBroker(String ... commands) throws Exception {
        return this.runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
    }

    public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String ... commands) throws Exception {
        return this.runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, commands);
    }

    private ContainerExecResult runCommandOnAnyBrokerWithScript(String scriptType, String ... commands) throws Exception {
        BrokerContainer container = this.getAnyBroker();
        String[] cmds = new String[commands.length + 1];
        cmds[0] = scriptType;
        System.arraycopy(commands, 0, cmds, 1, commands.length);
        return container.execCmd(cmds);
    }

    public void stopAllBrokers() {
        this.brokerContainers.values().forEach(PulsarContainer::stop);
    }

    public void startAllBrokers() {
        this.brokerContainers.values().forEach(PulsarContainer::start);
    }

    public void stopAllBookies() {
        this.bookieContainers.values().forEach(PulsarContainer::stop);
    }

    public void startAllBookies() {
        this.bookieContainers.values().forEach(PulsarContainer::start);
    }

    public void stopZooKeeper() {
        this.zkContainer.stop();
    }

    public void startZooKeeper() {
        this.zkContainer.start();
    }

    public ContainerExecResult createNamespace(String nsName) throws Exception {
        return this.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + nsName, "--clusters", this.clusterName);
    }

    public ContainerExecResult createPartitionedTopic(String topicName, int partitions) throws Exception {
        return this.runAdminCommandOnAnyBroker("topics", "create-partitioned-topic", topicName, "-p", String.valueOf(partitions));
    }

    public ContainerExecResult enableDeduplication(String nsName, boolean enabled) throws Exception {
        return this.runAdminCommandOnAnyBroker("namespaces", "set-deduplication", "public/" + nsName, enabled ? "--enable" : "--disable");
    }

    public PulsarClusterSpec getSpec() {
        return this.spec;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public Map<String, PrestoWorkerContainer> getSqlFollowWorkerContainers() {
        return this.sqlFollowWorkerContainers;
    }
}

