/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hedwig.client.api.Client;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.handlers.MessageConsumeCallback;
import org.apache.hedwig.client.netty.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.ConnectCallback;
import org.apache.hedwig.client.netty.HedwigPublisher;
import org.apache.hedwig.client.netty.HedwigSubscriber;
import org.apache.hedwig.client.netty.ResponseHandler;
import org.apache.hedwig.client.ssl.SslClientContextFactory;
import org.apache.hedwig.exceptions.PubSubException;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HedwigClientImpl
implements Client {
    private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
    protected final AtomicLong globalCounter = new AtomicLong();
    protected static final String COLON = ":";
    protected final ChannelFactory socketFactory;
    protected boolean ownChannelFactory = false;
    private ClientChannelPipelineFactory pipelineFactory;
    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
    private final ConcurrentMap<InetSocketAddress, List<ByteString>> host2Topics = new ConcurrentHashMap<InetSocketAddress, List<ByteString>>();
    private final Timer clientTimer = new Timer(true);
    private boolean isStopped = false;
    private HedwigSubscriber sub;
    private final HedwigPublisher pub;
    private final ClientConfiguration cfg;
    private final MessageConsumeCallback consumeCb;
    private SslClientContextFactory sslFactory = null;

    public static Client create(ClientConfiguration cfg) {
        return new HedwigClientImpl(cfg);
    }

    public static Client create(ClientConfiguration cfg, ChannelFactory socketFactory) {
        return new HedwigClientImpl(cfg, socketFactory);
    }

    protected HedwigClientImpl(ClientConfiguration cfg) {
        this(cfg, (ChannelFactory)new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool()));
        this.ownChannelFactory = true;
    }

    protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
        this.cfg = cfg;
        this.socketFactory = socketFactory;
        this.pub = new HedwigPublisher(this);
        this.sub = new HedwigSubscriber(this);
        this.pipelineFactory = new ClientChannelPipelineFactory(this);
        this.consumeCb = new MessageConsumeCallback(this);
        if (cfg.isSSLEnabled()) {
            this.sslFactory = new SslClientContextFactory(cfg);
        }
        this.clientTimer.schedule((TimerTask)new PubSubRequestTimeoutTask(), 0L, cfg.getTimeoutThreadRunInterval());
    }

    public ClientConfiguration getConfiguration() {
        return this.cfg;
    }

    @Override
    public HedwigSubscriber getSubscriber() {
        return this.sub;
    }

    protected void setSubscriber(HedwigSubscriber sub) {
        this.sub = sub;
    }

    @Override
    public HedwigPublisher getPublisher() {
        return this.pub;
    }

    public MessageConsumeCallback getConsumeCallback() {
        return this.consumeCb;
    }

    public SslClientContextFactory getSslFactory() {
        return this.sslFactory;
    }

    @Override
    public void close() {
        logger.info("Stopping the client!");
        this.isStopped = true;
        this.clientTimer.cancel();
        this.pub.close();
        this.sub.close();
        this.topic2Host.clear();
        this.host2Topics.clear();
        if (this.ownChannelFactory) {
            this.socketFactory.releaseExternalResources();
        }
        logger.info("Completed stopping the client!");
    }

    public void doConnect(PubSubData pubSubData, InetSocketAddress serverHost) {
        if (logger.isDebugEnabled()) {
            logger.debug("Connecting to host: " + serverHost + " with pubSubData: " + pubSubData);
        }
        ClientBootstrap bootstrap = new ClientBootstrap(this.socketFactory);
        bootstrap.setPipelineFactory((ChannelPipelineFactory)this.pipelineFactory);
        bootstrap.setOption("tcpNoDelay", (Object)true);
        bootstrap.setOption("keepAlive", (Object)true);
        ChannelFuture future = bootstrap.connect((SocketAddress)serverHost);
        future.addListener((ChannelFutureListener)new ConnectCallback(pubSubData, serverHost, this));
    }

    protected void storeTopic2HostMapping(PubSubData pubSubData, Channel channel) {
        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(channel);
        if (this.topic2Host.containsKey(pubSubData.topic) && ((InetSocketAddress)this.topic2Host.get(pubSubData.topic)).equals(host)) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: " + this.topic2Host.get(pubSubData.topic) + ", new host: " + host);
        }
        this.topic2Host.put(pubSubData.topic, host);
        if (this.host2Topics.containsKey(host)) {
            ((List)this.host2Topics.get(host)).add(pubSubData.topic);
        } else {
            LinkedList<ByteString> topicsList = new LinkedList<ByteString>();
            topicsList.add(pubSubData.topic);
            this.host2Topics.put(host, topicsList);
        }
    }

    public static InetSocketAddress getHostFromChannel(Channel channel) {
        return (InetSocketAddress)channel.getRemoteAddress();
    }

    public static ResponseHandler getResponseHandlerFromChannel(Channel channel) {
        return (ResponseHandler)channel.getPipeline().getLast();
    }

    public InetSocketAddress getHostForTopic(ByteString topic) {
        return (InetSocketAddress)this.topic2Host.get(topic);
    }

    public void clearAllTopicsForHost(InetSocketAddress host) {
        if (logger.isDebugEnabled()) {
            logger.debug("Clearing all topics for host: " + host);
        }
        if (this.host2Topics.containsKey(host)) {
            for (ByteString topic : (List)this.host2Topics.get(host)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
                }
                this.topic2Host.remove(topic);
            }
            this.host2Topics.remove(host);
        }
    }

    public boolean hasStopped() {
        return this.isStopped;
    }

    public Timer getClientTimer() {
        return this.clientTimer;
    }

    class PubSubRequestTimeoutTask
    extends TimerTask {
        PubSubRequestTimeoutTask() {
        }

        @Override
        public void run() {
            ResponseHandler responseHandler;
            if (logger.isDebugEnabled()) {
                logger.debug("Running the PubSubRequest Timeout Task");
            }
            long curTime = System.currentTimeMillis();
            long timeoutInterval = HedwigClientImpl.this.cfg.getServerAckResponseTimeout();
            for (Channel channel : ((HedwigClientImpl)HedwigClientImpl.this).pub.host2Channel.values()) {
                responseHandler = HedwigClientImpl.getResponseHandlerFromChannel(channel);
                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
                    this.checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
                }
            }
            for (Channel channel : ((HedwigClientImpl)HedwigClientImpl.this).sub.topicSubscriber2Channel.values()) {
                responseHandler = HedwigClientImpl.getResponseHandlerFromChannel(channel);
                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
                    this.checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
                }
            }
        }

        private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime, long timeoutInterval) {
            if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
                logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
                responseHandler.txn2PubSubData.remove(pubSubData.txnId);
                pubSubData.callback.operationFailed(pubSubData.context, (PubSubException)new PubSubException.UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
            }
        }
    }
}

