/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.impl.HChannelHandler;
import org.apache.hedwig.client.netty.impl.WriteCallback;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HChannelImpl
implements HChannel {
    private static Logger logger = LoggerFactory.getLogger(HChannelImpl.class);
    InetSocketAddress host;
    final AbstractHChannelManager channelManager;
    final ClientChannelPipelineFactory pipelineFactory;
    volatile Channel channel;
    volatile State state;
    volatile boolean closed = false;
    Queue<PubSubData> pendingOps = new ArrayDeque<PubSubData>();

    protected HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager) {
        this(host, channelManager, null);
    }

    public HChannelImpl(InetSocketAddress host, AbstractHChannelManager channelManager, ClientChannelPipelineFactory pipelineFactory) {
        this(host, null, channelManager, pipelineFactory);
        this.state = State.DISCONNECTED;
    }

    public HChannelImpl(InetSocketAddress host, Channel channel, AbstractHChannelManager channelManager, ClientChannelPipelineFactory pipelineFactory) {
        this.host = host;
        this.channel = channel;
        this.channelManager = channelManager;
        this.pipelineFactory = pipelineFactory;
        this.state = State.CONNECTED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submitOp(PubSubData pubSubData) {
        boolean doOpNow = false;
        if (null != this.channel && State.CONNECTED == this.state) {
            doOpNow = true;
        } else {
            HChannelImpl hChannelImpl = this;
            synchronized (hChannelImpl) {
                if (null != this.channel && State.CONNECTED == this.state) {
                    doOpNow = true;
                } else {
                    this.pendingOps.add(pubSubData);
                }
            }
            if (!doOpNow) {
                this.connect();
            }
        }
        if (doOpNow) {
            this.executeOpAfterConnected(pubSubData);
        }
    }

    private void executeOpAfterConnected(PubSubData pubSubData) {
        PubSubProtocol.PubSubRequest.Builder reqBuilder = NetUtils.buildPubSubRequest(this.channelManager.nextTxnId(), pubSubData);
        this.writePubSubRequest(pubSubData, reqBuilder.build());
    }

    @Override
    public Channel getChannel() {
        return this.channel;
    }

    private void writePubSubRequest(PubSubData pubSubData, PubSubProtocol.PubSubRequest pubSubRequest) {
        if (this.closed || null == this.channel || State.CONNECTED != this.state) {
            this.retryOrFailOp(pubSubData);
            return;
        }
        try {
            HChannelImpl.getHChannelHandlerFromChannel(this.channel).addTxn(pubSubData.txnId, pubSubData);
        }
        catch (NoResponseHandlerException nrhe) {
            logger.warn("No Channel Handler found for channel {} when writing request. It might already disconnect.", (Object)this.channel);
            return;
        }
        logger.debug("Writing a {} request to host: {} for pubSubData: {}.", VarArgs.va(pubSubData.operationType, this.host, pubSubData));
        ChannelFuture future = this.channel.write((Object)pubSubRequest);
        future.addListener((ChannelFutureListener)new WriteCallback(pubSubData, this.channelManager));
    }

    protected void retryOrFailOp(PubSubData pubSubData) {
        ByteString hostString = ByteString.copyFromUtf8((String)HedwigSocketAddress.sockAddrStr(this.host));
        if (pubSubData.connectFailedServers != null && pubSubData.connectFailedServers.contains(hostString)) {
            logger.error("Error connecting to host {} more than once so fail the request: {}", VarArgs.va(this.host, pubSubData));
            pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.CouldNotConnectException("Could not connect to host: " + this.host));
        } else {
            logger.error("Retry to connect to default hub server again for pubSubData: {}", (Object)pubSubData);
            if (pubSubData.connectFailedServers == null) {
                pubSubData.connectFailedServers = new LinkedList<ByteString>();
            }
            pubSubData.connectFailedServers.add(hostString);
            this.channelManager.submitOpToDefaultServer(pubSubData);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onChannelConnected(ChannelFuture future) {
        Queue<PubSubData> oldPendingOps;
        HChannelImpl hChannelImpl = this;
        synchronized (hChannelImpl) {
            if (this.closed) {
                future.getChannel().close();
                return;
            }
            this.state = State.CONNECTED;
            this.channel = future.getChannel();
            this.host = NetUtils.getHostFromChannel(this.channel);
            oldPendingOps = this.pendingOps;
            this.pendingOps = new ArrayDeque<PubSubData>();
        }
        for (PubSubData op : oldPendingOps) {
            this.executeOpAfterConnected(op);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onChannelConnectFailure() {
        Queue<PubSubData> oldPendingOps;
        HChannelImpl hChannelImpl = this;
        synchronized (hChannelImpl) {
            this.state = State.DISCONNECTED;
            this.channel = null;
            oldPendingOps = this.pendingOps;
            this.pendingOps = new ArrayDeque<PubSubData>();
        }
        for (PubSubData op : oldPendingOps) {
            this.retryOrFailOp(op);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect() {
        HChannelImpl hChannelImpl = this;
        synchronized (hChannelImpl) {
            if (State.CONNECTING == this.state || State.CONNECTED == this.state) {
                return;
            }
            this.state = State.CONNECTING;
        }
        ChannelFuture future = this.connect(this.host, this.pipelineFactory);
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (HChannelImpl.this.closed) {
                    future.getChannel().close();
                    return;
                }
                if (!future.isSuccess()) {
                    logger.error("Error connecting to host {}.", (Object)HChannelImpl.this.host);
                    future.getChannel().close();
                    HChannelImpl.this.onChannelConnectFailure();
                    return;
                }
                logger.debug("Connected to server {}.", (Object)HChannelImpl.this.host);
                HChannelImpl.this.onChannelConnected(future);
            }
        });
    }

    protected ChannelFuture connect(InetSocketAddress serverHost, ClientChannelPipelineFactory pipelineFactory) {
        logger.debug("Connecting to host {} ...", (Object)serverHost);
        ClientBootstrap bootstrap = new ClientBootstrap(this.channelManager.getChannelFactory());
        bootstrap.setPipelineFactory((ChannelPipelineFactory)pipelineFactory);
        bootstrap.setOption("tcpNoDelay", (Object)true);
        bootstrap.setOption("keepAlive", (Object)true);
        return bootstrap.connect((SocketAddress)serverHost);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(boolean wait) {
        HChannelImpl hChannelImpl = this;
        synchronized (hChannelImpl) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        if (null == this.channel) {
            return;
        }
        try {
            HChannelImpl.getHChannelHandlerFromChannel(this.channel).closeExplicitly();
        }
        catch (NoResponseHandlerException nrhe) {
            logger.warn("No channel handler found for channel {} when closing it.", (Object)this.channel);
        }
        if (wait) {
            this.channel.close().awaitUninterruptibly();
        } else {
            this.channel.close();
        }
        this.channel = null;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("[HChannel: host - ").append(this.host).append(", channel - ").append(this.channel).append(", pending reqs - ").append(this.pendingOps.size()).append(", closed - ").append(this.closed).append("]");
        return sb.toString();
    }

    @Override
    public void close() {
        this.close(false);
    }

    public static HChannelHandler getHChannelHandlerFromChannel(Channel channel) throws NoResponseHandlerException {
        if (null == channel) {
            throw new NoResponseHandlerException("Received a null value for the channel. Cannot retrieve the response handler");
        }
        HChannelHandler handler = (HChannelHandler)channel.getPipeline().getLast();
        if (null == handler) {
            throw new NoResponseHandlerException("Could not retrieve the response handler from the channel's pipeline.");
        }
        return handler;
    }

    static enum State {
        DISCONNECTED,
        CONNECTING,
        CONNECTED;

    }
}

