package org.apache.pulsar.zookeeper;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.class */
public class LocalBookkeeperEnsemble {
    public static final int CONNECTION_TIMEOUT = 30000;
    int numberOfBookies;
    private boolean clearOldData;
    private final Supplier<Integer> portManager;
    private String HOSTPORT;
    private String advertisedAddress;
    private int zkPort;
    NIOServerCnxnFactory serverFactory;
    ZooKeeperServer zks;
    ZooKeeper zkc;
    String zkDataDirName;
    String bkDataDirName;
    BookieServer[] bs;
    ServerConfiguration[] bsConfs;
    StreamStorageLifecycleComponent streamStorage;
    Integer streamStoragePort;
    protected static final Logger LOG = LoggerFactory.getLogger(LocalBookkeeperEnsemble.class);
    static int zkSessionTimeOut = 5000;

    /* loaded from: input_file:org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble$BasePortManager.class */
    private static class BasePortManager implements Supplier<Integer> {
        private int port;

        public BasePortManager(int i) {
            this.port = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public synchronized Integer get() {
            int i = this.port;
            this.port = i + 1;
            return Integer.valueOf(i);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble$ZKConnectionWatcher.class */
    public static class ZKConnectionWatcher implements Watcher {
        private CountDownLatch clientConnectLatch = new CountDownLatch(1);

        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                this.clientConnectLatch.countDown();
            }
        }

        public void waitForConnection() throws IOException {
            try {
                if (this.clientConnectLatch.await(LocalBookkeeperEnsemble.zkSessionTimeOut, TimeUnit.MILLISECONDS)) {
                } else {
                    throw new IOException("Couldn't connect to zookeeper server");
                }
            } catch (InterruptedException e) {
                throw new IOException("Interrupted when connecting to zookeeper server", e);
            }
        }
    }

    public LocalBookkeeperEnsemble(int i, int i2, Supplier<Integer> supplier) {
        this(i, i2, 4181, (String) null, (String) null, true, (String) null, supplier);
    }

    public LocalBookkeeperEnsemble(int i, int i2, int i3, String str, String str2, boolean z) {
        this(i, i2, i3, 4181, str, str2, z, (String) null);
    }

    public LocalBookkeeperEnsemble(int i, int i2, int i3, String str, String str2, boolean z, String str3) {
        this(i, i2, i3, 4181, str, str2, z, str3);
    }

    public LocalBookkeeperEnsemble(int i, int i2, int i3, int i4, String str, String str2, boolean z, String str3) {
        this(i, i2, 4181, str, str2, z, str3, new BasePortManager(i3));
    }

    public LocalBookkeeperEnsemble(int i, int i2, int i3, String str, String str2, boolean z, String str3, Supplier<Integer> supplier) {
        this.clearOldData = false;
        this.streamStoragePort = 4181;
        this.numberOfBookies = i;
        this.portManager = supplier;
        this.streamStoragePort = Integer.valueOf(i3);
        this.zkDataDirName = str;
        this.bkDataDirName = str2;
        this.clearOldData = z;
        this.zkPort = i2;
        this.advertisedAddress = str3 == null ? "127.0.0.1" : str3;
        LOG.info("Running {} bookie(s) and advertised them at {}.", Integer.valueOf(this.numberOfBookies), str3);
    }

    private void runZookeeper(int i) throws IOException {
        LOG.info("Starting ZK server");
        File file = StringUtils.isNotBlank(this.zkDataDirName) ? Files.createDirectories(Paths.get(this.zkDataDirName, new String[0]), new FileAttribute[0]).toFile() : Files.createTempDirectory("zktest", new FileAttribute[0]).toFile();
        if (this.clearOldData) {
            FileUtils.cleanDirectory(file);
        }
        try {
            System.setProperty("zookeeper.4lw.commands.whitelist", "*");
            this.zks = new ZooKeeperServer(file, file, 3000);
            this.serverFactory = new NIOServerCnxnFactory();
            this.serverFactory.configure(new InetSocketAddress(this.zkPort), i);
            this.serverFactory.startup(this.zks);
            this.zkPort = this.serverFactory.getLocalPort();
            this.HOSTPORT = "127.0.0.1:" + this.zkPort;
            LOG.info("ZooKeeper server up: {}", Boolean.valueOf(waitForServerUp(this.HOSTPORT, 30000L)));
            LOG.debug("Local ZK started (port: {}, data_directory: {})", Integer.valueOf(this.zkPort), file.getAbsolutePath());
        } catch (Exception e) {
            LOG.error("Exception while instantiating ZooKeeper", e);
            if (this.serverFactory != null) {
                this.serverFactory.shutdown();
            }
            throw new IOException(e);
        }
    }

    private void initializeZookeper() throws IOException {
        LOG.info("Instantiate ZK Client");
        try {
            ZKConnectionWatcher zKConnectionWatcher = new ZKConnectionWatcher();
            this.zkc = new ZooKeeper(this.HOSTPORT, zkSessionTimeOut, zKConnectionWatcher);
            zKConnectionWatcher.waitForConnection();
            if (this.zkc.exists("/ledgers", false) == null) {
                this.zkc.create("/ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (this.zkc.exists("/ledgers/available", false) == null) {
                this.zkc.create("/ledgers/available", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (this.zkc.exists("/ledgers/available/readonly", false) == null) {
                this.zkc.create("/ledgers/available/readonly", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (this.zkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null) {
                this.zkc.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            LOG.error("Exception while creating znodes", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while creating znodes", e2);
        }
    }

    private void runBookies(ServerConfiguration serverConfiguration) throws Exception {
        LOG.info("Starting Bookie(s)");
        this.bs = new BookieServer[this.numberOfBookies];
        this.bsConfs = new ServerConfiguration[this.numberOfBookies];
        for (int i = 0; i < this.numberOfBookies; i++) {
            File file = StringUtils.isNotBlank(this.bkDataDirName) ? Files.createDirectories(Paths.get(String.valueOf(this.bkDataDirName) + Integer.toString(i), new String[0]), new FileAttribute[0]).toFile() : Files.createTempDirectory("bk" + Integer.toString(i) + "test", new FileAttribute[0]).toFile();
            if (this.clearOldData) {
                FileUtils.cleanDirectory(file);
            }
            int intValue = this.portManager.get().intValue();
            String format = String.format("/ledgers/available/%s:%d", serverConfiguration.getAdvertisedAddress(), Integer.valueOf(intValue));
            if (this.zkc.exists(format, (Watcher) null) != null) {
                try {
                    this.zkc.delete(format, -1);
                } catch (KeeperException.NoNodeException unused) {
                }
            }
            this.bsConfs[i] = new ServerConfiguration(serverConfiguration);
            this.bsConfs[i].setBookiePort(intValue);
            this.bsConfs[i].setZkServers("127.0.0.1:" + this.zkPort);
            this.bsConfs[i].setJournalDirName(file.getPath());
            this.bsConfs[i].setLedgerDirNames(new String[]{file.getPath()});
            this.bsConfs[i].setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
            this.bsConfs[i].setAllowEphemeralPorts(true);
            try {
                this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE);
            } catch (BookieException.InvalidCookieException unused2) {
                Iterator it = this.zkc.getChildren("/ledgers/cookies", false).iterator();
                while (it.hasNext()) {
                    this.zkc.delete("/ledgers/cookies/" + ((String) it.next()), -1);
                }
                new File(new File(file, "current"), "VERSION").delete();
                this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE);
            }
            this.bs[i].start();
            LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", new Object[]{Integer.valueOf(i), Integer.valueOf(intValue), file.getAbsolutePath()});
        }
    }

    public void runStreamStorage(CompositeConfiguration compositeConfiguration) throws Exception {
        String str = "127.0.0.1:" + this.zkPort;
        String str2 = "zk://" + str + "/ledgers";
        URI create = URI.create(str2);
        compositeConfiguration.setProperty("metadataServiceUri", str2);
        compositeConfiguration.setProperty("dlog.bkcEnsembleSize", 1);
        compositeConfiguration.setProperty("dlog.bkcWriteQuorumSize", 1);
        compositeConfiguration.setProperty("dlog.bkcAckQuorumSize", 1);
        compositeConfiguration.setProperty("storageserver.grpc.port", this.streamStoragePort);
        new ZkClusterInitializer(str).initializeCluster(create, 2);
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        serverConfiguration.loadConf(compositeConfiguration);
        this.streamStorage = new StreamStorageLifecycleComponent(new BookieConfiguration(serverConfiguration), NullStatsLogger.INSTANCE);
        this.streamStorage.start();
        LOG.debug("Local BK stream storage started (port: {})", this.streamStoragePort);
        Throwable th = null;
        try {
            StorageAdminClient buildAdmin = StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().serviceUri("bk://localhost:4181").backoffPolicy(Backoff.Jitter.of(Backoff.Jitter.Type.EXPONENTIAL, 1000L, 10000L, 30L)).build()).buildAdmin();
            try {
                try {
                    LOG.info("'default' namespace for table service : {}", (NamespaceProperties) FutureUtils.result(buildAdmin.getNamespace("default")));
                } catch (Throwable th2) {
                    if (buildAdmin != null) {
                        buildAdmin.close();
                    }
                    throw th2;
                }
            } catch (NamespaceNotFoundException unused) {
                LOG.info("Creating default namespace");
                try {
                    LOG.info("Successfully created 'default' namespace :\n{}", (NamespaceProperties) FutureUtils.result(buildAdmin.createNamespace("default", NamespaceConfiguration.newBuilder().setDefaultStreamConf(ProtocolConstants.DEFAULT_STREAM_CONF).build())));
                } catch (NamespaceExistsException unused2) {
                    LOG.warn("Namespace 'default' already existed.");
                }
            }
            if (buildAdmin != null) {
                buildAdmin.close();
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    public void start(boolean z) throws Exception {
        LOG.debug("Local ZK/BK starting ...");
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        serverConfiguration.setLedgerStorageClass(DbLedgerStorage.class.getName());
        serverConfiguration.setProperty("dbStorage_writeCacheMaxSizeMb", 2);
        serverConfiguration.setProperty("dbStorage_readAheadCacheMaxSizeMb", 1);
        serverConfiguration.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 1);
        serverConfiguration.setProperty("dbStorage_rocksDB_blockCacheSize", 1048576);
        serverConfiguration.setFlushInterval(60000);
        serverConfiguration.setJournalSyncData(false);
        serverConfiguration.setProperty("journalMaxGroupWaitMSec", 0L);
        serverConfiguration.setAllowLoopback(true);
        serverConfiguration.setGcWaitTime(60000L);
        serverConfiguration.setNumAddWorkerThreads(0);
        serverConfiguration.setNumReadWorkerThreads(0);
        serverConfiguration.setNumHighPriorityWorkerThreads(0);
        serverConfiguration.setNumJournalCallbackThreads(0);
        serverConfiguration.setServerNumIOThreads(1);
        serverConfiguration.setNumLongPollWorkerThreads(1);
        serverConfiguration.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
        runZookeeper(1000);
        initializeZookeper();
        runBookies(serverConfiguration);
        if (z) {
            runStreamStorage(new CompositeConfiguration());
        }
    }

    public void start() throws Exception {
        start(false);
    }

    public void startStandalone() throws Exception {
        startStandalone(new ServerConfiguration(), false);
    }

    public void startStandalone(ServerConfiguration serverConfiguration, boolean z) throws Exception {
        LOG.debug("Local ZK/BK starting ...");
        serverConfiguration.setAdvertisedAddress(this.advertisedAddress);
        runZookeeper(1000);
        initializeZookeper();
        runBookies(serverConfiguration);
        if (z) {
            runStreamStorage(new CompositeConfiguration());
        }
    }

    public void stopBK() {
        LOG.debug("Local ZK/BK stopping ...");
        for (BookieServer bookieServer : this.bs) {
            bookieServer.shutdown();
        }
    }

    public void startBK() throws Exception {
        for (int i = 0; i < this.numberOfBookies; i++) {
            try {
                this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE);
            } catch (BookieException.InvalidCookieException unused) {
                Iterator it = this.zkc.getChildren("/ledgers/cookies", false).iterator();
                while (it.hasNext()) {
                    this.zkc.delete("/ledgers/cookies/" + ((String) it.next()), -1);
                }
                new File(new File(this.bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete();
                this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE);
            }
            this.bs[i].start();
        }
    }

    public void stop() throws Exception {
        if (this.streamStorage != null) {
            LOG.debug("Local bk stream storage stopping ...");
            this.streamStorage.close();
        }
        LOG.debug("Local ZK/BK stopping ...");
        for (BookieServer bookieServer : this.bs) {
            bookieServer.shutdown();
        }
        this.zkc.close();
        this.zks.shutdown();
        this.serverFactory.shutdown();
        LOG.debug("Local ZK/BK stopped");
    }

    public static boolean waitForServerUp(String str, long j) {
        Socket socket;
        BufferedReader bufferedReader;
        long currentTimeMillis = System.currentTimeMillis();
        String[] split = str.split(":");
        String str2 = split[0];
        int parseInt = Integer.parseInt(split[1]);
        while (true) {
            try {
                socket = new Socket(str2, parseInt);
                bufferedReader = null;
            } catch (IOException e) {
                LOG.info("server " + str + " not up " + e);
            }
            try {
                OutputStream outputStream = socket.getOutputStream();
                outputStream.write("stat".getBytes());
                outputStream.flush();
                bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String readLine = bufferedReader.readLine();
                if (readLine != null && readLine.startsWith("Zookeeper version:")) {
                    LOG.info("Server UP");
                    socket.close();
                    if (bufferedReader == null) {
                        return true;
                    }
                    bufferedReader.close();
                    return true;
                }
                socket.close();
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (System.currentTimeMillis() > currentTimeMillis + j) {
                    return false;
                }
                try {
                    Thread.sleep(250L);
                } catch (InterruptedException unused) {
                }
            } catch (Throwable th) {
                socket.close();
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                throw th;
            }
        }
    }

    public ZooKeeper getZkClient() {
        return this.zkc;
    }

    public ZooKeeperServer getZkServer() {
        return this.zks;
    }

    public BookieServer[] getBookies() {
        return this.bs;
    }

    public int getZookeeperPort() {
        return this.zkPort;
    }
}
