/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.topologies;

import com.github.dockerjava.api.exception.NotFoundException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.tests.integration.containers.BKContainer;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;

public class PulsarCluster {
    private static final Logger log = LoggerFactory.getLogger(PulsarCluster.class);
    public static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
    public static final String CLIENT_SCRIPT = "/pulsar/bin/pulsar-client";
    public static final String PULSAR_COMMAND_SCRIPT = "/pulsar/bin/pulsar";
    public static final String CURL = "/usr/bin/curl";
    private final PulsarClusterSpec spec;
    private final String clusterName;
    private final Network network;
    private final ZKContainer zkContainer;
    private final CSContainer csContainer;
    private final boolean sharedCsContainer;
    private final Map<String, BKContainer> bookieContainers;
    private final Map<String, BrokerContainer> brokerContainers;
    private final Map<String, WorkerContainer> workerContainers;
    private final ProxyContainer proxyContainer;
    private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
    private Map<String, Map<String, String>> externalServiceEnvs;

    public static PulsarCluster forSpec(PulsarClusterSpec spec) {
        CSContainer csContainer = (CSContainer)((CSContainer)new CSContainer(spec.clusterName).withNetwork(Network.newNetwork())).withNetworkAliases(new String[]{"configuration-store"});
        return new PulsarCluster(spec, csContainer, false);
    }

    public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) {
        return new PulsarCluster(spec, csContainer, true);
    }

    private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) {
        this.spec = spec;
        this.sharedCsContainer = sharedCsContainer;
        this.clusterName = spec.clusterName();
        this.network = csContainer.getNetwork();
        this.zkContainer = new ZKContainer(this.clusterName);
        ((ZKContainer)((ZKContainer)((ZKContainer)((ZKContainer)((ZKContainer)((ZKContainer)this.zkContainer.withNetwork(this.network)).withNetworkAliases(new String[]{this.appendClusterName("zookeeper")})).withEnv("clusterName", this.clusterName)).withEnv("zkServers", this.appendClusterName("zookeeper"))).withEnv("configurationStore", "configuration-store:2184")).withEnv("forceSync", "no")).withEnv("pulsarNode", this.appendClusterName("pulsar-broker-0"));
        this.csContainer = csContainer;
        this.bookieContainers = Maps.newTreeMap();
        this.brokerContainers = Maps.newTreeMap();
        this.workerContainers = Maps.newTreeMap();
        this.proxyContainer = (ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)new ProxyContainer(this.clusterName, this.appendClusterName("pulsar-proxy"), spec.enableTls).withNetwork(this.network)).withNetworkAliases(new String[]{this.appendClusterName("pulsar-proxy")})).withEnv("zkServers", this.appendClusterName("zookeeper"))).withEnv("zookeeperServers", this.appendClusterName("zookeeper"))).withEnv("configurationStoreServers", "configuration-store:2184")).withEnv("clusterName", this.clusterName);
        if (spec.enableTls) {
            ((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)((ProxyContainer)this.proxyContainer.withEnv("webServicePortTls", String.valueOf(8081))).withEnv("servicePortTls", String.valueOf(6651))).withEnv("forwardAuthorizationCredentials", "true")).withEnv("tlsRequireTrustedClientCertOnConnect", "true")).withEnv("tlsAllowInsecureConnection", "false")).withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")).withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")).withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")).withEnv("brokerClientAuthenticationPlugin", AuthenticationTls.class.getName())).withEnv("brokerClientAuthenticationParameters", String.format("tlsCertFile:%s,tlsKeyFile:%s", "/pulsar/certificate-authority/client-keys/admin.cert.pem", "/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))).withEnv("tlsEnabledWithBroker", "true")).withEnv("brokerClientTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")).withEnv("brokerClientCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")).withEnv("brokerClientKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
        }
        if (spec.proxyEnvs != null) {
            spec.proxyEnvs.forEach((arg_0, arg_1) -> ((ProxyContainer)this.proxyContainer).withEnv(arg_0, arg_1));
        }
        if (spec.proxyMountFiles != null) {
            spec.proxyMountFiles.forEach((arg_0, arg_1) -> ((ProxyContainer)this.proxyContainer).withFileSystemBind(arg_0, arg_1));
        }
        this.bookieContainers.putAll(PulsarCluster.runNumContainers("bookie", spec.numBookies(), name -> {
            BKContainer bookieContainer = (BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)((BKContainer)new BKContainer(this.clusterName, (String)name).withNetwork(this.network)).withNetworkAliases(new String[]{this.appendClusterName((String)name)})).withEnv("zkServers", this.appendClusterName("zookeeper"))).withEnv("useHostNameAsBookieID", "true")).withEnv("journalSyncData", "false")).withEnv("journalMaxGroupWaitMSec", "0")).withEnv("clusterName", this.clusterName)).withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")).withEnv("diskUsageThreshold", "0.99")).withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")).withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize));
            if (spec.bookkeeperEnvs != null) {
                bookieContainer.withEnv(spec.bookkeeperEnvs);
            }
            if (spec.bookieAdditionalPorts != null) {
                spec.bookieAdditionalPorts.forEach(arg_0 -> ((BKContainer)bookieContainer).addExposedPort(arg_0));
            }
            return bookieContainer;
        }));
        this.brokerContainers.putAll(PulsarCluster.runNumContainers("broker", spec.numBrokers(), name -> {
            BrokerContainer brokerContainer = (BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)new BrokerContainer(this.clusterName, this.appendClusterName((String)name), spec.enableTls).withNetwork(this.network)).withNetworkAliases(new String[]{this.appendClusterName((String)name)})).withEnv("zkServers", this.appendClusterName("zookeeper"))).withEnv("zookeeperServers", this.appendClusterName("zookeeper"))).withEnv("configurationStoreServers", "configuration-store:2184")).withEnv("clusterName", this.clusterName)).withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")).withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")).withEnv("AWS_ACCESS_KEY_ID", "accesskey")).withEnv("AWS_SECRET_KEY", "secretkey")).withEnv("maxMessageSize", "" + spec.maxMessageSize);
            if (spec.enableTls) {
                ((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)brokerContainer.withEnv("webServicePortTls", String.valueOf(8081))).withEnv("brokerServicePortTls", String.valueOf(6651))).withEnv("authenticateOriginalAuthData", "true")).withEnv("tlsAllowInsecureConnection", "false")).withEnv("tlsRequireTrustedClientCertOnConnect", "true")).withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")).withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")).withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
            }
            if (spec.queryLastMessage) {
                brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
                brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
            }
            if (spec.brokerEnvs != null) {
                brokerContainer.withEnv(spec.brokerEnvs);
            }
            if (spec.brokerMountFiles != null) {
                spec.brokerMountFiles.forEach((arg_0, arg_1) -> ((BrokerContainer)brokerContainer).withFileSystemBind(arg_0, arg_1));
            }
            if (spec.brokerAdditionalPorts() != null) {
                spec.brokerAdditionalPorts().forEach(arg_0 -> ((BrokerContainer)brokerContainer).addExposedPort(arg_0));
            }
            return brokerContainer;
        }));
        spec.classPathVolumeMounts.forEach((key, value) -> {
            this.zkContainer.withClasspathResourceMapping((String)key, (String)value, BindMode.READ_WRITE);
            this.proxyContainer.withClasspathResourceMapping((String)key, (String)value, BindMode.READ_WRITE);
            this.bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
            this.brokerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
            this.workerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
        });
    }

    public String getPlainTextServiceUrl() {
        return this.proxyContainer.getPlainTextServiceUrl();
    }

    public String getHttpServiceUrl() {
        return this.proxyContainer.getHttpServiceUrl();
    }

    public String getAnyBrokersHttpsServiceUrl() {
        return this.getAnyBroker().getHttpsServiceUrl();
    }

    public String getAnyBrokersServiceUrlTls() {
        return this.getAnyBroker().getServiceUrlTls();
    }

    public String getAllBrokersHttpServiceUrl() {
        Object multiUrl = "http://";
        Iterator<BrokerContainer> brokers = this.getBrokers().iterator();
        while (brokers.hasNext()) {
            BrokerContainer broker = brokers.next();
            multiUrl = (String)multiUrl + broker.getHost() + ":" + broker.getMappedPort(8080);
            if (!brokers.hasNext()) continue;
            multiUrl = (String)multiUrl + ",";
        }
        return multiUrl;
    }

    public String getZKConnString() {
        return this.zkContainer.getHost() + ":" + this.zkContainer.getMappedPort(2181);
    }

    public String getCSConnString() {
        return this.csContainer.getHost() + ":" + this.csContainer.getMappedPort(2184);
    }

    public Network getNetwork() {
        return this.network;
    }

    public Map<String, GenericContainer<?>> getExternalServices() {
        return this.externalServices;
    }

    public void start() throws Exception {
        this.zkContainer.start();
        log.info("Successfully started local zookeeper container.");
        if (!this.sharedCsContainer) {
            this.csContainer.start();
            log.info("Successfully started configuration store container.");
        }
        this.zkContainer.execCmd("bin/init-cluster.sh");
        log.info("Successfully initialized the cluster.");
        this.bookieContainers.values().forEach(PulsarContainer::start);
        log.info("Successfully started {} bookie containers.", (Object)this.bookieContainers.size());
        this.startAllBrokers();
        log.info("Successfully started {} broker containers.", (Object)this.brokerContainers.size());
        this.proxyContainer.start();
        log.info("Successfully started pulsar proxy.");
        log.info("Pulsar cluster {} is up running:", (Object)this.clusterName);
        log.info("\tBinary Service Url : {}", (Object)this.getPlainTextServiceUrl());
        log.info("\tHttp Service Url : {}", (Object)this.getHttpServiceUrl());
        this.externalServices = this.spec.externalServices;
        this.externalServiceEnvs = this.spec.externalServiceEnvs;
        if (null != this.externalServices) {
            this.externalServices.entrySet().parallelStream().forEach(service -> {
                GenericContainer serviceContainer = (GenericContainer)service.getValue();
                serviceContainer.withNetwork(this.network);
                serviceContainer.withNetworkAliases(new String[]{(String)service.getKey()});
                if (null != this.externalServiceEnvs && null != this.externalServiceEnvs.get(service.getKey())) {
                    Map env = this.externalServiceEnvs.getOrDefault(service.getKey(), Collections.emptyMap());
                    serviceContainer.withEnv(env);
                }
                PulsarContainer.configureLeaveContainerRunning(serviceContainer);
                serviceContainer.start();
                log.info("Successfully started external service {}.", service.getKey());
            });
        }
    }

    public void startService(String networkAlias, GenericContainer<?> serviceContainer) {
        log.info("Starting external service {} ...", (Object)networkAlias);
        serviceContainer.withNetwork(this.network);
        serviceContainer.withNetworkAliases(new String[]{networkAlias});
        PulsarContainer.configureLeaveContainerRunning(serviceContainer);
        serviceContainer.start();
        log.info("Successfully start external service {}", (Object)networkAlias);
    }

    public static void stopService(String networkAlias, GenericContainer<?> serviceContainer) {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        log.info("Stopping external service {} ...", (Object)networkAlias);
        serviceContainer.stop();
        log.info("Successfully stop external service {}", (Object)networkAlias);
    }

    private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName, int numContainers, Function<String, T> containerCreator) {
        TreeMap containers = Maps.newTreeMap();
        for (int i = 0; i < numContainers; ++i) {
            String name = "pulsar-" + serviceName + "-" + i;
            PulsarContainer container = (PulsarContainer)((Object)containerCreator.apply(name));
            containers.put(name, container);
        }
        return containers;
    }

    public synchronized void stop() {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        PulsarCluster.stopInParallel(this.workerContainers.values());
        if (this.externalServices != null) {
            PulsarCluster.stopInParallel(this.externalServices.values());
        }
        if (null != this.proxyContainer) {
            this.proxyContainer.stop();
        }
        PulsarCluster.stopInParallel(this.brokerContainers.values());
        PulsarCluster.stopInParallel(this.bookieContainers.values());
        if (!this.sharedCsContainer && null != this.csContainer) {
            this.csContainer.stop();
        }
        if (null != this.zkContainer) {
            this.zkContainer.stop();
        }
        try {
            this.network.close();
        }
        catch (Exception e) {
            log.info("Failed to shutdown network for pulsar cluster {}", (Object)this.clusterName, (Object)e);
        }
    }

    private static void stopInParallel(Collection<? extends GenericContainer<?>> containers) {
        containers.parallelStream().filter(Objects::nonNull).forEach(GenericContainer::stop);
    }

    public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, int numFunctionWorkers) {
        switch (runtimeType) {
            case THREAD: {
                this.startFunctionWorkersWithThreadContainerFactory(suffix, numFunctionWorkers);
                break;
            }
            case PROCESS: {
                this.startFunctionWorkersWithProcessContainerFactory(suffix, numFunctionWorkers);
            }
        }
    }

    private void startFunctionWorkersWithProcessContainerFactory(String suffix, int numFunctionWorkers) {
        this.workerContainers.putAll(PulsarCluster.runNumContainers("functions-worker-process-" + suffix, numFunctionWorkers, name -> this.createWorkerContainer((String)name)));
        this.startWorkers();
    }

    private WorkerContainer createWorkerContainer(String name) {
        String serviceUrl = "pulsar://pulsar-broker-0:6650";
        String httpServiceUrl = "http://pulsar-broker-0:8080";
        return (WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)((WorkerContainer)new WorkerContainer(this.clusterName, name).withNetwork(this.network)).withNetworkAliases(new String[]{name})).withEnv("PF_workerId", name)).withEnv("PF_workerHostname", name)).withEnv("PF_workerPort", "8080")).withEnv("PF_pulsarFunctionsCluster", this.clusterName)).withEnv("PF_pulsarServiceUrl", serviceUrl)).withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)).withEnv("clusterName", this.clusterName)).withEnv("zookeeperServers", "zookeeper")).withEnv("zkServers", "zookeeper");
    }

    private void startFunctionWorkersWithThreadContainerFactory(String suffix, int numFunctionWorkers) {
        this.workerContainers.putAll(PulsarCluster.runNumContainers("functions-worker-thread-" + suffix, numFunctionWorkers, name -> (WorkerContainer)((WorkerContainer)this.createWorkerContainer((String)name).withEnv("PF_functionRuntimeFactoryClassName", "org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory")).withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName", "pf-container-group")));
        this.startWorkers();
    }

    public synchronized void startWorkers() {
        this.workerContainers.values().parallelStream().forEach(PulsarContainer::start);
        log.info("Successfully started {} worker containers.", (Object)this.workerContainers.size());
    }

    public synchronized void stopWorker(String workerName) {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        WorkerContainer worker = this.workerContainers.get(workerName);
        if (worker == null) {
            log.warn("Failed to find the worker to stop ({})", (Object)workerName);
            return;
        }
        worker.stop();
        this.workerContainers.remove(workerName);
        log.info("Worker {} stopped and removed from the map of worker containers", (Object)workerName);
    }

    public synchronized void stopWorkers() {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        this.workerContainers.values().parallelStream().forEach(PulsarContainer::stop);
        this.workerContainers.clear();
    }

    public void startContainers(Map<String, GenericContainer<?>> containers) {
        containers.forEach((name, container) -> {
            PulsarContainer.configureLeaveContainerRunning(container);
            container.withNetwork(this.network).withNetworkAliases(new String[]{name}).start();
            log.info("Successfully start container {}.", name);
        });
    }

    public static void stopContainers(Map<String, GenericContainer<?>> containers) {
        if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
            PulsarCluster.logIgnoringStopDueToLeaveRunning();
            return;
        }
        containers.values().parallelStream().forEach(GenericContainer::stop);
        log.info("Successfully stop containers : {}", containers);
    }

    private static void logIgnoringStopDueToLeaveRunning() {
        log.warn("Ignoring stop due to PULSAR_CONTAINERS_LEAVE_RUNNING=true.");
    }

    public BrokerContainer getAnyBroker() {
        return this.getAnyContainer(this.brokerContainers, "pulsar-broker");
    }

    public synchronized WorkerContainer getAnyWorker() {
        return this.getAnyContainer(this.workerContainers, "pulsar-functions-worker");
    }

    public synchronized List<WorkerContainer> getAlWorkers() {
        return new ArrayList<WorkerContainer>(this.workerContainers.values());
    }

    public BrokerContainer getBroker(int index) {
        return this.getAnyContainer(this.brokerContainers, "pulsar-broker", index);
    }

    public synchronized WorkerContainer getWorker(int index) {
        return this.getAnyContainer(this.workerContainers, "pulsar-functions-worker", index);
    }

    public synchronized WorkerContainer getWorker(String workerName) {
        return this.workerContainers.get(workerName);
    }

    private <T> T getAnyContainer(Map<String, T> containers, String serviceName) {
        ArrayList containerList = Lists.newArrayList();
        containerList.addAll(containers.values());
        Collections.shuffle(containerList);
        Preconditions.checkArgument((!containerList.isEmpty() ? 1 : 0) != 0, (Object)("No " + serviceName + " is alive"));
        return (T)containerList.get(0);
    }

    private <T> T getAnyContainer(Map<String, T> containers, String serviceName, int index) {
        Preconditions.checkArgument((!containers.isEmpty() ? 1 : 0) != 0, (Object)("No " + serviceName + " is alive"));
        Preconditions.checkArgument((index >= 0 && index < containers.size() ? 1 : 0) != 0, (Object)("Index : " + index + " is out range"));
        return containers.get(serviceName.toLowerCase() + "-" + index);
    }

    public Collection<BrokerContainer> getBrokers() {
        return this.brokerContainers.values();
    }

    public ProxyContainer getProxy() {
        return this.proxyContainer;
    }

    public Collection<BKContainer> getBookies() {
        return this.bookieContainers.values();
    }

    public ZKContainer getZooKeeper() {
        return this.zkContainer;
    }

    public ContainerExecResult runAdminCommandOnAnyBroker(String ... commands) throws Exception {
        return this.runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
    }

    public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String ... commands) throws Exception {
        return this.runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, commands);
    }

    private ContainerExecResult runCommandOnAnyBrokerWithScript(String scriptType, String ... commands) throws Exception {
        BrokerContainer container = this.getAnyBroker();
        String[] cmds = new String[commands.length + 1];
        cmds[0] = scriptType;
        System.arraycopy(commands, 0, cmds, 1, commands.length);
        return container.execCmd(cmds);
    }

    public void stopAllBrokers() {
        this.brokerContainers.values().forEach(PulsarContainer::stop);
    }

    public void startAllBrokers() {
        this.brokerContainers.values().forEach(PulsarContainer::start);
    }

    public void stopAllBookies() {
        this.bookieContainers.values().forEach(PulsarContainer::stop);
    }

    public void startAllBookies() {
        this.bookieContainers.values().forEach(PulsarContainer::start);
    }

    public void stopZooKeeper() {
        this.zkContainer.stop();
    }

    public void startZooKeeper() {
        this.zkContainer.start();
    }

    public ContainerExecResult createNamespace(String nsName) throws Exception {
        return this.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + nsName, "--clusters", this.clusterName);
    }

    public ContainerExecResult createPartitionedTopic(String topicName, int partitions) throws Exception {
        return this.runAdminCommandOnAnyBroker("topics", "create-partitioned-topic", topicName, "-p", String.valueOf(partitions));
    }

    public ContainerExecResult enableDeduplication(String nsName, boolean enabled) throws Exception {
        return this.runAdminCommandOnAnyBroker("namespaces", "set-deduplication", "public/" + nsName, enabled ? "--enable" : "--disable");
    }

    public void dumpFunctionLogs(String name) {
        for (WorkerContainer container : this.getAlWorkers()) {
            log.info("Trying to get function {} logs from container {}", (Object)name, (Object)container.getContainerName());
            try {
                String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log";
                String logs = (String)container.copyFileFromContainer(logFile, inputStream -> IOUtils.toString((InputStream)inputStream, (String)"utf-8"));
                log.info("Function {} logs {}", (Object)name, (Object)logs);
            }
            catch (NotFoundException notFound) {
                log.info("Cannot download {} logs from {} not found exception {}", new Object[]{name, container.getContainerName(), notFound.toString()});
            }
            catch (Throwable err) {
                log.info("Cannot download {} logs from {}", new Object[]{name, container.getContainerName(), err});
            }
        }
    }

    private String appendClusterName(String name) {
        return this.sharedCsContainer ? this.clusterName + "-" + name : name;
    }

    public BKContainer getAnyBookie() {
        return this.getAnyContainer(this.bookieContainers, "bookie");
    }

    public PulsarClusterSpec getSpec() {
        return this.spec;
    }

    public String getClusterName() {
        return this.clusterName;
    }
}

