/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tajo.rpc;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;

public abstract class NettyClientBase
implements Closeable {
    private static Log LOG = LogFactory.getLog(NettyClientBase.class);
    private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60;
    private static final long PAUSE = 1000L;
    private int numRetries;
    protected ClientBootstrap bootstrap;
    private ChannelFuture channelFuture;

    public abstract <T> T getStub();

    public abstract RpcConnectionPool.RpcConnectionKey getKey();

    public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory, int numRetries) throws ConnectTimeoutException {
        this.numRetries = numRetries;
        this.init(addr, pipeFactory, factory);
    }

    public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory) throws ConnectTimeoutException {
        this.bootstrap = new ClientBootstrap((ChannelFactory)factory);
        this.bootstrap.setPipelineFactory(pipeFactory);
        this.bootstrap.setOption("connectTimeoutMillis", (Object)10000);
        this.bootstrap.setOption("connectResponseTimeoutMillis", (Object)10000);
        this.bootstrap.setOption("receiveBufferSize", (Object)0xA00000);
        this.bootstrap.setOption("tcpNoDelay", (Object)true);
        this.bootstrap.setOption("keepAlive", (Object)true);
        this.connect(addr);
    }

    private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
        this.channelFuture = this.bootstrap.connect((SocketAddress)addr);
        final CountDownLatch latch = new CountDownLatch(1);
        this.channelFuture.addListener(new ChannelFutureListener(){
            private final AtomicInteger retryCount = new AtomicInteger();

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    if (NettyClientBase.this.numRetries > this.retryCount.getAndIncrement()) {
                        Thread.sleep(1000L);
                        NettyClientBase.this.channelFuture = NettyClientBase.this.bootstrap.connect((SocketAddress)addr);
                        NettyClientBase.this.channelFuture.addListener((ChannelFutureListener)this);
                        LOG.debug((Object)("Connecting to " + addr + " has been failed. Retrying to connect."));
                    } else {
                        latch.countDown();
                        LOG.error((Object)("Max retry count has been exceeded. attempts=" + NettyClientBase.this.numRetries));
                    }
                } else {
                    latch.countDown();
                }
            }
        });
        try {
            latch.await(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (!this.channelFuture.isSuccess()) {
            throw new ConnectTimeoutException("Connect error to " + addr + " caused by " + ExceptionUtils.getMessage((Throwable)this.channelFuture.getCause()));
        }
    }

    public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
        if (addr.isUnresolved()) {
            addr = NetUtils.createSocketAddr((String)addr.getHostName(), (int)addr.getPort());
        }
        this.handleConnectionInternally(addr);
    }

    public boolean isConnected() {
        return this.getChannel().isConnected();
    }

    public InetSocketAddress getRemoteAddress() {
        if (this.channelFuture == null || this.channelFuture.getChannel() == null) {
            return null;
        }
        return (InetSocketAddress)this.channelFuture.getChannel().getRemoteAddress();
    }

    public Channel getChannel() {
        return this.channelFuture.getChannel();
    }

    @Override
    public void close() {
        InetSocketAddress address;
        if (this.channelFuture != null && this.getChannel().isOpen()) {
            try {
                this.getChannel().close().awaitUninterruptibly();
            }
            catch (Throwable ce) {
                LOG.warn((Object)ce);
            }
        }
        if (this.bootstrap != null && (address = this.getRemoteAddress()) != null) {
            LOG.debug((Object)("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort()));
        }
    }
}

