/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.client.netty.HedwigClientImpl;
import org.apache.hedwig.client.netty.WriteCallback;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HedwigPublisher
implements Publisher {
    private static Logger logger = LoggerFactory.getLogger(HedwigPublisher.class);
    protected final ConcurrentMap<InetSocketAddress, Channel> host2Channel = new ConcurrentHashMap<InetSocketAddress, Channel>();
    private final HedwigClientImpl client;
    private final ClientConfiguration cfg;
    private boolean closed = false;

    protected HedwigPublisher(HedwigClientImpl client) {
        this.client = client;
        this.cfg = client.getConfiguration();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void publish(ByteString topic, PubSubProtocol.Message msg) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        PubSubData pubSubData;
        if (logger.isDebugEnabled()) {
            logger.debug("Calling a sync publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
        }
        PubSubData pubSubData2 = pubSubData = new PubSubData(topic, msg, null, PubSubProtocol.OperationType.PUBLISH, null, null, null);
        synchronized (pubSubData2) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            this.asyncPublish(topic, msg, pubSubCallback, null);
            try {
                while (!pubSubData.isDone) {
                    pubSubData.wait();
                }
            }
            catch (InterruptedException e) {
                throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for async publish call");
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                PubSubException failureException = pubSubCallback.getFailureException();
                if (failureException == null) {
                    logger.error("Sync Publish operation failed but no PubSubException was passed!");
                    throw new PubSubException.ServiceDownException("Server ack response to publish request is not successful");
                }
                if (failureException instanceof PubSubException.CouldNotConnectException) {
                    throw (PubSubException.CouldNotConnectException)failureException;
                }
                if (failureException instanceof PubSubException.ServiceDownException) {
                    throw (PubSubException.ServiceDownException)failureException;
                }
                logger.error("Unexpected exception type when a sync publish operation failed: " + failureException);
                throw new PubSubException.ServiceDownException("Server ack response to publish request is not successful");
            }
        }
    }

    @Override
    public void asyncPublish(ByteString topic, PubSubProtocol.Message msg, Callback<Void> callback, Object context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Calling an async publish for topic: " + topic.toStringUtf8() + ", msg: " + msg);
        }
        PubSubData pubSubData = new PubSubData(topic, msg, null, PubSubProtocol.OperationType.PUBLISH, null, callback, context);
        if (this.client.topic2Host.containsKey(topic)) {
            InetSocketAddress host = (InetSocketAddress)this.client.topic2Host.get(topic);
            if (this.host2Channel.containsKey(host)) {
                this.doPublish(pubSubData, (Channel)this.host2Channel.get(host));
            } else {
                this.client.doConnect(pubSubData, host);
            }
        } else {
            InetSocketAddress host = this.cfg.getDefaultServerHost();
            if (this.host2Channel.containsKey(host)) {
                this.doPublish(pubSubData, (Channel)this.host2Channel.get(host));
                return;
            }
            this.client.doConnect(pubSubData, host);
        }
    }

    protected void doPublish(PubSubData pubSubData, Channel channel) {
        PubSubProtocol.PubSubRequest.Builder pubsubRequestBuilder = PubSubProtocol.PubSubRequest.newBuilder();
        pubsubRequestBuilder.setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE);
        pubsubRequestBuilder.setType(PubSubProtocol.OperationType.PUBLISH);
        if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
            pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
        }
        long txnId = this.client.globalCounter.incrementAndGet();
        pubsubRequestBuilder.setTxnId(txnId);
        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
        pubsubRequestBuilder.setTopic(pubSubData.topic);
        PubSubProtocol.PublishRequest.Builder publishRequestBuilder = PubSubProtocol.PublishRequest.newBuilder();
        publishRequestBuilder.setMsg(pubSubData.msg);
        pubsubRequestBuilder.setPublishRequest(publishRequestBuilder);
        pubSubData.txnId = txnId;
        pubSubData.requestWriteTime = System.currentTimeMillis();
        HedwigClientImpl.getResponseHandlerFromChannel((Channel)channel).txn2PubSubData.put(txnId, pubSubData);
        if (logger.isDebugEnabled()) {
            logger.debug("Writing a Publish request to host: " + HedwigClientImpl.getHostFromChannel(channel) + " for pubSubData: " + pubSubData);
        }
        ChannelFuture future = channel.write((Object)pubsubRequestBuilder.build());
        future.addListener((ChannelFutureListener)new WriteCallback(pubSubData, this.client));
    }

    protected synchronized void storeHost2ChannelMapping(Channel channel) {
        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(channel);
        if (!this.closed && !this.host2Channel.containsKey(host)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Storing a new Channel mapping for host: " + host);
            }
            this.host2Channel.put(host, channel);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Channel mapping to host: " + host + " already exists so no need to store it.");
            }
            HedwigClientImpl.getResponseHandlerFromChannel((Channel)channel).channelClosedExplicitly = true;
            channel.close();
        }
    }

    public Channel getChannelForHost(InetSocketAddress host) {
        return (Channel)this.host2Channel.get(host);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close() {
        HedwigPublisher hedwigPublisher = this;
        synchronized (hedwigPublisher) {
            this.closed = true;
        }
        for (Channel channel : this.host2Channel.values()) {
            HedwigClientImpl.getResponseHandlerFromChannel((Channel)channel).channelClosedExplicitly = true;
            channel.close().awaitUninterruptibly();
        }
        this.host2Channel.clear();
    }
}

