package org.apache.hw_v4_0_0.hedwig.server.netty;

import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hw_v4_0_0.bookkeeper.client.BKException;
import org.apache.hw_v4_0_0.bookkeeper.client.BookKeeper;
import org.apache.hw_v4_0_0.bookkeeper.conf.ClientConfiguration;
import org.apache.hw_v4_0_0.commons.configuration.ConfigurationException;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.server.common.TerminateJVMExceptionHandler;
import org.apache.hw_v4_0_0.hedwig.server.delivery.DeliveryManager;
import org.apache.hw_v4_0_0.hedwig.server.delivery.FIFODeliveryManager;
import org.apache.hw_v4_0_0.hedwig.server.handlers.ConsumeHandler;
import org.apache.hw_v4_0_0.hedwig.server.handlers.Handler;
import org.apache.hw_v4_0_0.hedwig.server.handlers.PublishHandler;
import org.apache.hw_v4_0_0.hedwig.server.handlers.SubscribeHandler;
import org.apache.hw_v4_0_0.hedwig.server.handlers.UnsubscribeHandler;
import org.apache.hw_v4_0_0.hedwig.server.persistence.BookkeeperPersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.persistence.LocalDBPersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManagerWithRangeScan;
import org.apache.hw_v4_0_0.hedwig.server.persistence.ReadAheadCache;
import org.apache.hw_v4_0_0.hedwig.server.regions.HedwigHubClientFactory;
import org.apache.hw_v4_0_0.hedwig.server.regions.RegionManager;
import org.apache.hw_v4_0_0.hedwig.server.ssl.SslServerContextFactory;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.InMemorySubscriptionManager;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionManager;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.ZkSubscriptionManager;
import org.apache.hw_v4_0_0.hedwig.server.topics.AbstractTopicManager;
import org.apache.hw_v4_0_0.hedwig.server.topics.TopicManager;
import org.apache.hw_v4_0_0.hedwig.server.topics.TrivialOwnAllTopicManager;
import org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager;
import org.apache.hw_v4_0_0.hedwig.util.ConcurrencyUtils;
import org.apache.hw_v4_0_0.hedwig.util.Either;
import org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncCallback;
import org.apache.hw_v4_0_0.zookkeeper.KeeperException;
import org.apache.hw_v4_0_0.zookkeeper.WatchedEvent;
import org.apache.hw_v4_0_0.zookkeeper.Watcher;
import org.apache.hw_v4_0_0.zookkeeper.ZooKeeper;
import org.jboss.hw_v4_0_0.netty.bootstrap.ServerBootstrap;
import org.jboss.hw_v4_0_0.netty.channel.group.ChannelGroup;
import org.jboss.hw_v4_0_0.netty.channel.group.DefaultChannelGroup;
import org.jboss.hw_v4_0_0.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.hw_v4_0_0.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.hw_v4_0_0.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.hw_v4_0_0.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.hw_v4_0_0.netty.logging.InternalLoggerFactory;
import org.jboss.hw_v4_0_0.netty.logging.Log4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/netty/PubSubServer.class */
public class PubSubServer {
    static Logger logger = LoggerFactory.getLogger(PubSubServer.class);
    ServerSocketChannelFactory serverChannelFactory;
    ClientSocketChannelFactory clientChannelFactory;
    ServerConfiguration conf;
    ChannelGroup allChannels;
    PersistenceManager pm;
    DeliveryManager dm;
    TopicManager tm;
    SubscriptionManager sm;
    RegionManager rm;
    ZooKeeper zk;
    BookKeeper bk;
    ScheduledExecutorService scheduler;
    public static final int RC_INVALID_CONF_FILE = 1;
    public static final int RC_MISCONFIGURED = 2;
    public static final int RC_OTHER = 3;

    protected PersistenceManager instantiatePersistenceManager(TopicManager topicManager) throws IOException, InterruptedException {
        PersistenceManagerWithRangeScan bookkeeperPersistenceManager;
        if (this.conf.isStandalone()) {
            bookkeeperPersistenceManager = LocalDBPersistenceManager.instance();
        } else {
            try {
                ClientConfiguration clientConfiguration = new ClientConfiguration();
                clientConfiguration.addConfiguration(this.conf.getConf());
                this.bk = new BookKeeper(clientConfiguration, this.zk, this.clientChannelFactory);
                bookkeeperPersistenceManager = new BookkeeperPersistenceManager(this.bk, this.zk, topicManager, this.conf, this.scheduler);
            } catch (KeeperException e) {
                logger.error("Could not instantiate bookkeeper client", e);
                throw new IOException(e);
            }
        }
        PersistenceManager persistenceManager = bookkeeperPersistenceManager;
        if (this.conf.getReadAheadEnabled()) {
            persistenceManager = new ReadAheadCache(bookkeeperPersistenceManager, this.conf).start();
        }
        return persistenceManager;
    }

    protected SubscriptionManager instantiateSubscriptionManager(TopicManager topicManager, PersistenceManager persistenceManager) {
        return this.conf.isStandalone() ? new InMemorySubscriptionManager(topicManager, persistenceManager, this.conf, this.scheduler) : new ZkSubscriptionManager(this.zk, topicManager, persistenceManager, this.conf, this.scheduler);
    }

    protected RegionManager instantiateRegionManager(PersistenceManager persistenceManager, ScheduledExecutorService scheduledExecutorService) {
        return new RegionManager(persistenceManager, this.conf, this.zk, scheduledExecutorService, new HedwigHubClientFactory(this.conf, this.clientChannelFactory));
    }

    protected void instantiateZookeeperClient() throws Exception {
        if (this.conf.isStandalone()) {
            return;
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.zk = new ZooKeeper(this.conf.getZkHost(), this.conf.getZkTimeout(), new Watcher() { // from class: org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer.1
            @Override // org.apache.hw_v4_0_0.zookkeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (Watcher.Event.KeeperState.SyncConnected.equals(watchedEvent.getState())) {
                    countDownLatch.countDown();
                }
            }
        });
        if (countDownLatch.await(this.conf.getZkTimeout() * 2, TimeUnit.MILLISECONDS)) {
            return;
        }
        logger.error("Could not establish connection with ZooKeeper after zk_timeout*2 = " + (this.conf.getZkTimeout() * 2) + " ms. (Default value for zk_timeout is 2000).");
        throw new Exception("Could not establish connection with ZooKeeper after zk_timeout*2 = " + (this.conf.getZkTimeout() * 2) + " ms. (Default value for zk_timeout is 2000).");
    }

    protected TopicManager instantiateTopicManager() throws IOException {
        AbstractTopicManager zkTopicManager;
        if (this.conf.isStandalone()) {
            zkTopicManager = new TrivialOwnAllTopicManager(this.conf, this.scheduler);
        } else {
            try {
                zkTopicManager = new ZkTopicManager(this.zk, this.conf, this.scheduler);
            } catch (PubSubException e) {
                logger.error("Could not instantiate zk-topic manager", e);
                throw new IOException(e);
            }
        }
        return zkTopicManager;
    }

    protected Map<PubSubProtocol.OperationType, Handler> initializeNettyHandlers(TopicManager topicManager, DeliveryManager deliveryManager, PersistenceManager persistenceManager, SubscriptionManager subscriptionManager) {
        HashMap hashMap = new HashMap();
        hashMap.put(PubSubProtocol.OperationType.PUBLISH, new PublishHandler(topicManager, persistenceManager, this.conf));
        hashMap.put(PubSubProtocol.OperationType.SUBSCRIBE, new SubscribeHandler(topicManager, deliveryManager, persistenceManager, subscriptionManager, this.conf));
        hashMap.put(PubSubProtocol.OperationType.UNSUBSCRIBE, new UnsubscribeHandler(topicManager, this.conf, subscriptionManager, deliveryManager));
        hashMap.put(PubSubProtocol.OperationType.CONSUME, new ConsumeHandler(topicManager, subscriptionManager, this.conf));
        return Collections.unmodifiableMap(hashMap);
    }

    protected void initializeNetty(SslServerContextFactory sslServerContextFactory, Map<PubSubProtocol.OperationType, Handler> map) {
        boolean z = sslServerContextFactory != null;
        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.serverChannelFactory);
        serverBootstrap.setPipelineFactory(new PubSubServerPipelineFactory(new UmbrellaHandler(this.allChannels, map, z), sslServerContextFactory, this.conf.getMaximumMessageSize()));
        serverBootstrap.setOption("child.tcpNoDelay", true);
        serverBootstrap.setOption("child.keepAlive", true);
        serverBootstrap.setOption("reuseAddress", true);
        this.allChannels.add(serverBootstrap.bind(z ? new InetSocketAddress(this.conf.getSSLServerPort()) : new InetSocketAddress(this.conf.getServerPort())));
        logger.info("Going into receive loop");
    }

    public void shutdown() {
        try {
            if (this.zk != null) {
                this.zk.close();
            }
            if (this.bk != null) {
                this.bk.close();
            }
        } catch (InterruptedException e) {
            logger.error("Error while closing ZooKeeper client!");
        } catch (BKException e2) {
            logger.error("Error while closing BookKeeper client");
        }
        this.rm.stop();
        if (this.pm instanceof ReadAheadCache) {
            ((ReadAheadCache) this.pm).stop();
        }
        if (this.dm instanceof FIFODeliveryManager) {
            ((FIFODeliveryManager) this.dm).stop();
        }
        if (this.sm instanceof AbstractSubscriptionManager) {
            ((AbstractSubscriptionManager) this.sm).stop();
        }
        this.allChannels.close().awaitUninterruptibly();
        this.serverChannelFactory.releaseExternalResources();
        this.clientChannelFactory.releaseExternalResources();
        this.scheduler.shutdown();
    }

    public PubSubServer(final ServerConfiguration serverConfiguration, final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
        this.conf = serverConfiguration;
        serverConfiguration.validate();
        ThreadGroup threadGroup = new ThreadGroup("hedwig") { // from class: org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer.2
            @Override // java.lang.ThreadGroup, java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                uncaughtExceptionHandler.uncaughtException(thread, th);
            }
        };
        SafeAsyncCallback.setUncaughtExceptionHandler(uncaughtExceptionHandler);
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(threadGroup, new Runnable() { // from class: org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    PubSubServer.this.scheduler = Executors.newSingleThreadScheduledExecutor();
                    PubSubServer.this.serverChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
                    PubSubServer.this.clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
                    PubSubServer.this.instantiateZookeeperClient();
                    PubSubServer.this.tm = PubSubServer.this.instantiateTopicManager();
                    PubSubServer.this.pm = PubSubServer.this.instantiatePersistenceManager(PubSubServer.this.tm);
                    PubSubServer.this.dm = new FIFODeliveryManager(PubSubServer.this.pm, serverConfiguration);
                    PubSubServer.this.sm = PubSubServer.this.instantiateSubscriptionManager(PubSubServer.this.tm, PubSubServer.this.pm);
                    PubSubServer.this.rm = PubSubServer.this.instantiateRegionManager(PubSubServer.this.pm, PubSubServer.this.scheduler);
                    PubSubServer.this.sm.addListener(PubSubServer.this.rm);
                    PubSubServer.this.allChannels = new DefaultChannelGroup("hedwig");
                    Map<PubSubProtocol.OperationType, Handler> initializeNettyHandlers = PubSubServer.this.initializeNettyHandlers(PubSubServer.this.tm, PubSubServer.this.dm, PubSubServer.this.pm, PubSubServer.this.sm);
                    PubSubServer.this.initializeNetty(null, initializeNettyHandlers);
                    if (serverConfiguration.isSSLEnabled()) {
                        PubSubServer.this.initializeNetty(new SslServerContextFactory(serverConfiguration), initializeNettyHandlers);
                    }
                    ConcurrencyUtils.put(synchronousQueue, Either.of(new Object(), (Exception) null));
                } catch (Exception e) {
                    ConcurrencyUtils.put(synchronousQueue, Either.right(e));
                }
            }
        }).start();
        Either either = (Either) ConcurrencyUtils.take(synchronousQueue);
        if (either.left() == null) {
            throw ((Exception) either.right());
        }
    }

    public PubSubServer(ServerConfiguration serverConfiguration) throws Exception {
        this(serverConfiguration, new TerminateJVMExceptionHandler());
    }

    public static void errorMsgAndExit(String str, Throwable th, int i) {
        logger.error(str, th);
        System.err.println(str);
        System.exit(i);
    }

    public static void main(String[] strArr) {
        logger.info("Attempting to start Hedwig");
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        if (strArr.length > 0) {
            String str = strArr[0];
            try {
                serverConfiguration.loadConf(new File(str).toURI().toURL());
            } catch (MalformedURLException e) {
                errorMsgAndExit("Could not open configuration file: " + str, e, 1);
            } catch (ConfigurationException e2) {
                errorMsgAndExit("Malformed configuration file: " + str, e2, 2);
            }
            logger.info("Using configuration file " + str);
        }
        try {
            new PubSubServer(serverConfiguration);
        } catch (Throwable th) {
            errorMsgAndExit("Error during startup", th, 3);
        }
    }
}
