/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.zookeeper;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.pulsar.shade.org.apache.bookkeeper.bookie.BookieException;
import org.apache.pulsar.shade.org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.Backoff;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookieServer;
import org.apache.pulsar.shade.org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.pulsar.shade.org.apache.commons.configuration.CompositeConfiguration;
import org.apache.pulsar.shade.org.apache.commons.io.FileUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.shade.org.apache.zookeeper.Watcher;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.server.DatadirCleanupManager;
import org.apache.pulsar.shade.org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.pulsar.shade.org.apache.zookeeper.server.ServerCnxn;
import org.apache.pulsar.shade.org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalBookkeeperEnsemble {
    protected static final Logger LOG = LoggerFactory.getLogger(LocalBookkeeperEnsemble.class);
    public static final int CONNECTION_TIMEOUT = 30000;
    int numberOfBookies;
    private final boolean clearOldData;
    private final Supplier<Integer> portManager;
    private String hostPort;
    private final String advertisedAddress;
    private int zkPort;
    NIOServerCnxnFactory serverFactory;
    ZooKeeperServer zks;
    DatadirCleanupManager zkDataCleanupManager;
    ZooKeeper zkc;
    static int zkSessionTimeOut = 5000;
    String zkDataDirName;
    String bkDataDirName;
    BookieServer[] bs;
    ServerConfiguration[] bsConfs;
    StreamStorageLifecycleComponent streamStorage;
    Integer streamStoragePort = 4181;
    List<File> temporaryDirectories = new ArrayList<File>();

    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer> portManager) {
        this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager);
    }

    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, String bkDataDirName, boolean clearOldData) {
        this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, null);
    }

    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, String bkDataDirName, boolean clearOldData, String advertisedAddress) {
        this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress);
    }

    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, int streamStoragePort, String zkDataDirName, String bkDataDirName, boolean clearOldData, String advertisedAddress) {
        this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress, new BasePortManager(bkBasePort));
    }

    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int streamStoragePort, String zkDataDirName, String bkDataDirName, boolean clearOldData, String advertisedAddress, Supplier<Integer> portManager) {
        this.numberOfBookies = numberOfBookies;
        this.portManager = portManager;
        this.streamStoragePort = streamStoragePort;
        this.zkDataDirName = zkDataDirName;
        this.bkDataDirName = bkDataDirName;
        this.clearOldData = clearOldData;
        this.zkPort = zkPort;
        this.advertisedAddress = null == advertisedAddress ? "127.0.0.1" : advertisedAddress;
        LOG.info("Running {} bookie(s) and advertised them at {}.", (Object)this.numberOfBookies, (Object)this.advertisedAddress);
    }

    private File createTempDirectory(String seed) throws IOException {
        File res = Files.createTempDirectory(seed, new FileAttribute[0]).toFile();
        this.temporaryDirectories.add(res);
        return res;
    }

    private void runZookeeper(int maxCC) throws IOException {
        File zkDataDir;
        LOG.info("Starting ZK server");
        File file = zkDataDir = StringUtils.isNotBlank(this.zkDataDirName) ? Files.createDirectories(Paths.get(this.zkDataDirName, new String[0]), new FileAttribute[0]).toFile() : this.createTempDirectory("zktest");
        if (this.clearOldData) {
            FileUtils.cleanDirectory(zkDataDir);
        }
        try {
            System.setProperty("zookeeper.4lw.commands.whitelist", "*");
            this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
            this.serverFactory = new NIOServerCnxnFactory();
            this.serverFactory.configure(new InetSocketAddress(this.zkPort), maxCC);
            this.serverFactory.startup(this.zks);
            this.zkDataCleanupManager = new DatadirCleanupManager(zkDataDir, zkDataDir, 3, 1);
            this.zkDataCleanupManager.start();
        }
        catch (Exception e) {
            LOG.error("Exception while instantiating ZooKeeper", (Throwable)e);
            if (this.serverFactory != null) {
                this.serverFactory.shutdown();
            }
            throw new IOException(e);
        }
        this.zkPort = this.serverFactory.getLocalPort();
        this.hostPort = "127.0.0.1:" + this.zkPort;
        boolean b = LocalBookkeeperEnsemble.waitForServerUp(this.hostPort, 30000L);
        LOG.info("ZooKeeper server up: {}", (Object)b);
        LOG.debug("Local ZK started (port: {}, data_directory: {})", (Object)this.zkPort, (Object)zkDataDir.getAbsolutePath());
    }

    public void disconnectZookeeper(ZooKeeper zooKeeper) {
        ServerCnxn serverCnxn = this.getZookeeperServerConnection(zooKeeper);
        try {
            LOG.info("disconnect ZK server side connection {}", (Object)serverCnxn);
            Class<?> disconnectReasonClass = Class.forName("org.apache.pulsar.shade.org.apache.zookeeper.server.ServerCnxn$DisconnectReason");
            Method method = serverCnxn.getClass().getMethod("close", disconnectReasonClass);
            method.invoke((Object)serverCnxn, Stream.of(disconnectReasonClass.getEnumConstants()).filter(s -> s.toString().equals("CONNECTION_CLOSE_FORCED")).findFirst().get());
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    public ServerCnxn getZookeeperServerConnection(ZooKeeper zooKeeper) {
        return StreamSupport.stream(this.serverFactory.getConnections().spliterator(), false).filter(cnxn -> cnxn.getSessionId() == zooKeeper.getSessionId()).findFirst().orElse(null);
    }

    private void initializeZookeper() throws IOException {
        LOG.info("Instantiate ZK Client");
        try {
            ZKConnectionWatcher zkConnectionWatcher = new ZKConnectionWatcher();
            this.zkc = new ZooKeeper(this.hostPort, zkSessionTimeOut, zkConnectionWatcher);
            zkConnectionWatcher.waitForConnection();
            if (this.zkc.exists("/ledgers", false) == null) {
                this.zkc.create("/ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (this.zkc.exists("/ledgers/available", false) == null) {
                this.zkc.create("/ledgers/available", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (this.zkc.exists("/ledgers/available/readonly", false) == null) {
                this.zkc.create("/ledgers/available/readonly", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (this.zkc.exists("/bookies", false) == null) {
                this.zkc.create("/bookies", "{}".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
        catch (KeeperException e) {
            LOG.error("Exception while creating znodes", (Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while creating znodes", (Throwable)e);
        }
    }

    private void runBookies(ServerConfiguration baseConf) throws Exception {
        LOG.info("Starting Bookie(s)");
        this.bs = new BookieServer[this.numberOfBookies];
        this.bsConfs = new ServerConfiguration[this.numberOfBookies];
        for (int i = 0; i < this.numberOfBookies; ++i) {
            File bkDataDir;
            File file = bkDataDir = StringUtils.isNotBlank(this.bkDataDirName) ? Files.createDirectories(Paths.get(this.bkDataDirName + i, new String[0]), new FileAttribute[0]).toFile() : this.createTempDirectory("bk" + Integer.toString(i) + "test");
            if (this.clearOldData) {
                FileUtils.cleanDirectory(bkDataDir);
            }
            int bookiePort = this.portManager.get();
            String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort);
            if (this.zkc.exists(registrationZnode, null) != null) {
                try {
                    this.zkc.delete(registrationZnode, -1);
                }
                catch (KeeperException.NoNodeException noNodeException) {
                    // empty catch block
                }
            }
            this.bsConfs[i] = new ServerConfiguration(baseConf);
            this.bsConfs[i].setBookiePort(bookiePort);
            this.bsConfs[i].setZkServers("127.0.0.1:" + this.zkPort);
            this.bsConfs[i].setJournalDirName(bkDataDir.getPath());
            this.bsConfs[i].setLedgerDirNames(new String[]{bkDataDir.getPath()});
            this.bsConfs[i].setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
            this.bsConfs[i].setAllowEphemeralPorts(true);
            try {
                this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE, null);
            }
            catch (BookieException.InvalidCookieException e) {
                for (String path : this.zkc.getChildren("/ledgers/cookies", false)) {
                    this.zkc.delete("/ledgers/cookies/" + path, -1);
                }
                new File(new File(bkDataDir, "current"), "VERSION").delete();
                this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE, null);
            }
            this.bs[i].start();
            LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", new Object[]{i, bookiePort, bkDataDir.getAbsolutePath()});
        }
    }

    public void runStreamStorage(CompositeConfiguration conf) throws Exception {
        String zkServers = "127.0.0.1:" + this.zkPort;
        String metadataServiceUriStr = "zk://" + zkServers + "/ledgers";
        URI metadataServiceUri = URI.create(metadataServiceUriStr);
        conf.setProperty("metadataServiceUri", metadataServiceUriStr);
        conf.setProperty("dlog.bkcEnsembleSize", 1);
        conf.setProperty("dlog.bkcWriteQuorumSize", 1);
        conf.setProperty("dlog.bkcAckQuorumSize", 1);
        conf.setProperty("storageserver.grpc.port", this.streamStoragePort);
        conf.setProperty("storage.range.store.dirs", this.bkDataDirName + "/ranges/data");
        ZkClusterInitializer initializer = new ZkClusterInitializer(zkServers);
        initializer.initializeCluster(metadataServiceUri, 2);
        ServerConfiguration serverConf = new ServerConfiguration();
        serverConf.loadConf(conf);
        BookieConfiguration bkConf = new BookieConfiguration(serverConf);
        this.streamStorage = new StreamStorageLifecycleComponent(bkConf, NullStatsLogger.INSTANCE);
        this.streamStorage.start();
        LOG.debug("Local BK stream storage started (port: {})", (Object)this.streamStoragePort);
        try (StorageAdminClient admin = StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().serviceUri("bk://localhost:4181").backoffPolicy(Backoff.Jitter.of(Backoff.Jitter.Type.EXPONENTIAL, 1000L, 10000L, 30L)).build()).buildAdmin();){
            try {
                NamespaceProperties ns = FutureUtils.result(admin.getNamespace("default"));
                LOG.info("'default' namespace for table service : {}", (Object)ns);
            }
            catch (NamespaceNotFoundException nnfe) {
                LOG.info("Creating default namespace");
                try {
                    NamespaceProperties ns = FutureUtils.result(admin.createNamespace("default", NamespaceConfiguration.newBuilder().setDefaultStreamConf(ProtocolConstants.DEFAULT_STREAM_CONF).build()));
                    LOG.info("Successfully created 'default' namespace :\n{}", (Object)ns);
                }
                catch (NamespaceExistsException nee) {
                    LOG.warn("Namespace 'default' already existed.");
                }
            }
        }
    }

    public void start(boolean enableStreamStorage) throws Exception {
        LOG.debug("Local ZK/BK starting ...");
        ServerConfiguration conf = new ServerConfiguration();
        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
        conf.setProperty("dbStorage_writeCacheMaxSizeMb", 2);
        conf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 1);
        conf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 1);
        conf.setProperty("dbStorage_rocksDB_blockCacheSize", 0x100000);
        conf.setFlushInterval(60000);
        conf.setJournalSyncData(false);
        conf.setProperty("journalMaxGroupWaitMSec", 0L);
        conf.setAllowLoopback(true);
        conf.setGcWaitTime(60000L);
        conf.setNumAddWorkerThreads(0);
        conf.setNumReadWorkerThreads(0);
        conf.setNumHighPriorityWorkerThreads(0);
        conf.setNumJournalCallbackThreads(0);
        conf.setServerNumIOThreads(1);
        conf.setNumLongPollWorkerThreads(1);
        conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
        this.runZookeeper(1000);
        this.initializeZookeper();
        this.runBookies(conf);
        if (enableStreamStorage) {
            this.runStreamStorage(new CompositeConfiguration());
        }
    }

    public void start() throws Exception {
        this.start(false);
    }

    public void startStandalone() throws Exception {
        this.startStandalone(new ServerConfiguration(), false);
    }

    public void startStandalone(ServerConfiguration conf, boolean enableStreamStorage) throws Exception {
        LOG.debug("Local ZK/BK starting ...");
        conf.setAdvertisedAddress(this.advertisedAddress);
        this.runZookeeper(1000);
        this.initializeZookeper();
        this.runBookies(conf);
        if (enableStreamStorage) {
            this.runStreamStorage(new CompositeConfiguration());
        }
    }

    public void stopBK(int i) {
        this.bs[i].shutdown();
    }

    public void stopBK() {
        LOG.debug("Local ZK/BK stopping ...");
        for (BookieServer bookie : this.bs) {
            bookie.shutdown();
        }
    }

    public void startBK(int i) throws Exception {
        try {
            this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE, null);
        }
        catch (BookieException.InvalidCookieException e) {
            for (String path : this.zkc.getChildren("/ledgers/cookies", false)) {
                this.zkc.delete("/ledgers/cookies/" + path, -1);
            }
            new File(new File(this.bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete();
            this.bs[i] = new BookieServer(this.bsConfs[i], NullStatsLogger.INSTANCE, null);
        }
        this.bs[i].start();
    }

    public void startBK() throws Exception {
        for (int i = 0; i < this.numberOfBookies; ++i) {
            this.startBK(i);
        }
    }

    public void stop() throws Exception {
        if (null != this.streamStorage) {
            LOG.debug("Local bk stream storage stopping ...");
            this.streamStorage.close();
        }
        LOG.debug("Local ZK/BK stopping ...");
        for (BookieServer bookie : this.bs) {
            try {
                bookie.shutdown();
            }
            catch (Exception e) {
                LOG.warn("failed to shutdown bookie", (Throwable)e);
            }
        }
        this.zkc.close();
        this.zks.shutdown();
        this.serverFactory.shutdown();
        if (this.zkDataCleanupManager != null) {
            this.zkDataCleanupManager.shutdown();
        }
        LOG.debug("Local ZK/BK stopped");
        for (File managedDir : this.temporaryDirectories) {
            LOG.info("deleting test directory {}", (Object)managedDir);
            FileUtils.deleteDirectory(managedDir);
        }
        this.temporaryDirectories.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static boolean waitForServerUp(String hp, long timeout) {
        long start = System.currentTimeMillis();
        String[] split = hp.split(":");
        String host = split[0];
        int port = Integer.parseInt(split[1]);
        while (true) {
            try {
                Socket sock = new Socket(host, port);
                BufferedReader reader = null;
                try {
                    OutputStream outstream = sock.getOutputStream();
                    outstream.write("stat".getBytes());
                    outstream.flush();
                    reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
                    String line = reader.readLine();
                    if (line != null && line.startsWith("Zookeeper version:")) {
                        LOG.info("Server UP");
                        boolean bl = true;
                        return bl;
                    }
                }
                finally {
                    sock.close();
                    if (reader != null) {
                        reader.close();
                    }
                }
            }
            catch (IOException e) {
                LOG.info("server " + hp + " not up " + e);
            }
            if (System.currentTimeMillis() > start + timeout) {
                return false;
            }
            try {
                Thread.sleep(250L);
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    public ZooKeeper getZkClient() {
        return this.zkc;
    }

    public ZooKeeperServer getZkServer() {
        return this.zks;
    }

    public BookieServer[] getBookies() {
        return this.bs;
    }

    public int getZookeeperPort() {
        return this.zkPort;
    }

    private static class BasePortManager
    implements Supplier<Integer> {
        private int port;

        public BasePortManager(int basePort) {
            this.port = basePort;
        }

        @Override
        public synchronized Integer get() {
            return this.port++;
        }
    }

    public static class ZKConnectionWatcher
    implements Watcher {
        private final CountDownLatch clientConnectLatch = new CountDownLatch(1);

        @Override
        public void process(WatchedEvent event) {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                this.clientConnectLatch.countDown();
            }
        }

        public void waitForConnection() throws IOException {
            try {
                if (!this.clientConnectLatch.await(zkSessionTimeOut, TimeUnit.MILLISECONDS)) {
                    throw new IOException("Couldn't connect to zookeeper server");
                }
            }
            catch (InterruptedException e) {
                throw new IOException("Interrupted when connecting to zookeeper server", e);
            }
        }
    }
}

