package org.apache.distributedlog.service;

import com.twitter.finagle.builder.Server;
import java.io.File;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LocalDLMEmulator;
import org.apache.distributedlog.client.routing.SingleHostRoutingService;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/DistributedLogCluster.class */
public class DistributedLogCluster {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class);
    private final DistributedLogConfiguration dlConf;
    private final ZooKeeperServerShim zks;
    private final LocalDLMEmulator dlmEmulator;
    private DLServer dlServer;
    private final boolean shouldStartProxy;
    private final int proxyPort;
    private final boolean thriftmux;
    private final List<File> tmpDirs;

    /* loaded from: input_file:org/apache/distributedlog/service/DistributedLogCluster$Builder.class */
    public static class Builder {
        int numBookies;
        boolean shouldStartZK;
        String zkHost;
        int zkPort;
        boolean shouldStartProxy;
        int proxyPort;
        boolean thriftmux;
        DistributedLogConfiguration dlConf;
        ServerConfiguration bkConf;

        private Builder() {
            this.numBookies = 3;
            this.shouldStartZK = true;
            this.zkHost = "127.0.0.1";
            this.zkPort = 0;
            this.shouldStartProxy = true;
            this.proxyPort = 7000;
            this.thriftmux = false;
            this.dlConf = new DistributedLogConfiguration().setLockTimeout(10L).setOutputBufferSize(0).setImmediateFlushEnabled(true);
            this.bkConf = new ServerConfiguration();
        }

        public Builder numBookies(int i) {
            this.numBookies = i;
            return this;
        }

        public Builder shouldStartZK(boolean z) {
            this.shouldStartZK = z;
            return this;
        }

        public Builder zkServers(String str) {
            this.zkHost = str;
            return this;
        }

        public Builder zkPort(int i) {
            this.zkPort = i;
            return this;
        }

        public Builder shouldStartProxy(boolean z) {
            this.shouldStartProxy = z;
            return this;
        }

        public Builder proxyPort(int i) {
            this.proxyPort = i;
            return this;
        }

        public Builder dlConf(DistributedLogConfiguration distributedLogConfiguration) {
            this.dlConf = distributedLogConfiguration;
            return this;
        }

        public Builder bkConf(ServerConfiguration serverConfiguration) {
            this.bkConf = serverConfiguration;
            return this;
        }

        public Builder thriftmux(boolean z) {
            this.thriftmux = z;
            return this;
        }

        public DistributedLogCluster build() throws Exception {
            return new DistributedLogCluster(this.dlConf, this.bkConf, this.numBookies, this.shouldStartZK, this.zkHost, this.zkPort, this.shouldStartProxy, this.proxyPort, this.thriftmux);
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/service/DistributedLogCluster$DLServer.class */
    public static class DLServer {
        static final int MAX_RETRIES = 20;
        static final int MIN_PORT = 1025;
        static final int MAX_PORT = 65535;
        int proxyPort;
        public final InetSocketAddress address;
        public final Pair<DistributedLogServiceImpl, Server> dlServer;
        private final SingleHostRoutingService routingService = SingleHostRoutingService.of((SocketAddress) null);

        protected DLServer(DistributedLogConfiguration distributedLogConfiguration, URI uri, int i, boolean z) throws Exception {
            this.proxyPort = i;
            boolean z2 = false;
            int i2 = 0;
            Pair<DistributedLogServiceImpl, Server> pair = null;
            while (!z2) {
                try {
                    org.apache.distributedlog.service.config.ServerConfiguration serverConfiguration = new org.apache.distributedlog.service.config.ServerConfiguration();
                    serverConfiguration.loadConf(distributedLogConfiguration);
                    serverConfiguration.setServerShardId(this.proxyPort);
                    pair = DistributedLogServer.runServer(serverConfiguration, distributedLogConfiguration, uri, new IdentityStreamPartitionConverter(), this.routingService, new NullStatsProvider(), this.proxyPort, z, new EqualLoadAppraiser());
                    this.routingService.setAddress(DLSocketAddress.getSocketAddress(this.proxyPort));
                    this.routingService.startService();
                    ((DistributedLogServiceImpl) pair.getLeft()).startPlacementPolicy();
                    z2 = true;
                } catch (BindException e) {
                    i2++;
                    if (i2 > MAX_RETRIES) {
                        throw e;
                    }
                    this.proxyPort++;
                    if (this.proxyPort > MAX_PORT) {
                        this.proxyPort = MIN_PORT;
                    }
                }
            }
            DistributedLogCluster.LOG.info("Running DL on port {}", Integer.valueOf(this.proxyPort));
            this.dlServer = pair;
            this.address = DLSocketAddress.getSocketAddress(this.proxyPort);
        }

        public InetSocketAddress getAddress() {
            return this.address;
        }

        public void shutdown() {
            DistributedLogServer.closeServer(this.dlServer, 0L, TimeUnit.MILLISECONDS);
            this.routingService.stopService();
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private DistributedLogCluster(DistributedLogConfiguration distributedLogConfiguration, ServerConfiguration serverConfiguration, int i, boolean z, String str, int i2, boolean z2, int i3, boolean z3) throws Exception {
        this.tmpDirs = new ArrayList();
        this.dlConf = distributedLogConfiguration;
        if (z) {
            File createTempDir = IOUtils.createTempDir("zookeeper", "distrlog");
            this.tmpDirs.add(createTempDir);
            if (0 == i2) {
                Pair runZookeeperOnAnyPort = LocalDLMEmulator.runZookeeperOnAnyPort(createTempDir);
                this.zks = (ZooKeeperServerShim) runZookeeperOnAnyPort.getLeft();
                i2 = ((Integer) runZookeeperOnAnyPort.getRight()).intValue();
            } else {
                this.zks = LocalBookKeeper.runZookeeper(1000, i2, createTempDir);
            }
        } else {
            this.zks = null;
        }
        this.dlmEmulator = LocalDLMEmulator.newBuilder().numBookies(i).zkHost(str).zkPort(i2).serverConf(serverConfiguration).shouldStartZK(false).build();
        this.shouldStartProxy = z2;
        this.proxyPort = i3;
        this.thriftmux = z3;
    }

    public void start() throws Exception {
        this.dlmEmulator.start();
        DLMetadata.create(new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl")).update(this.dlmEmulator.getUri());
        if (this.shouldStartProxy) {
            this.dlServer = new DLServer(this.dlConf, this.dlmEmulator.getUri(), this.proxyPort, this.thriftmux);
        } else {
            this.dlServer = null;
        }
    }

    public void stop() throws Exception {
        if (null != this.dlServer) {
            this.dlServer.shutdown();
        }
        this.dlmEmulator.teardown();
        if (null != this.zks) {
            this.zks.stop();
        }
        Iterator<File> it = this.tmpDirs.iterator();
        while (it.hasNext()) {
            FileUtils.forceDeleteOnExit(it.next());
        }
    }

    public URI getUri() {
        return this.dlmEmulator.getUri();
    }

    public String getZkServers() {
        return this.dlmEmulator.getZkServers();
    }

    public String getProxyFinagleStr() {
        return "inet!" + (this.dlServer == null ? "127.0.0.1:" + this.proxyPort : this.dlServer.getAddress().toString());
    }
}
