/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tajo.rpc;

import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.ProtoPipelineFactory;
import org.apache.tajo.rpc.RemoteException;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.RpcProtos;
import org.apache.tajo.rpc.TajoServiceException;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;

public class BlockingRpcClient
extends NettyClientBase {
    private static final Log LOG = LogFactory.getLog(RpcProtos.class);
    private final ChannelUpstreamHandler handler;
    private final ChannelPipelineFactory pipeFactory;
    private final ProxyRpcChannel rpcChannel;
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final Map<Integer, ProtoCallFuture> requests = new ConcurrentHashMap<Integer, ProtoCallFuture>();
    private final Class<?> protocol;
    private final Method stubMethod;
    private RpcConnectionPool.RpcConnectionKey key;

    BlockingRpcClient(Class<?> protocol, InetSocketAddress addr, ClientSocketChannelFactory factory, int retries) throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
        this.protocol = protocol;
        String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
        Class<?> serviceClass = Class.forName(serviceClassName);
        this.stubMethod = serviceClass.getMethod("newBlockingStub", BlockingRpcChannel.class);
        this.handler = new ClientChannelUpstreamHandler();
        this.pipeFactory = new ProtoPipelineFactory(this.handler, (MessageLite)RpcProtos.RpcResponse.getDefaultInstance());
        super.init(addr, this.pipeFactory, factory, retries);
        this.rpcChannel = new ProxyRpcChannel();
        this.key = new RpcConnectionPool.RpcConnectionKey(addr, protocol, false);
    }

    @Override
    public RpcConnectionPool.RpcConnectionKey getKey() {
        return this.key;
    }

    @Override
    public <T> T getStub() {
        try {
            return (T)this.stubMethod.invoke(null, this.rpcChannel);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public BlockingRpcChannel getBlockingRpcChannel() {
        return this.rpcChannel;
    }

    private String getErrorMessage(String message) {
        if (this.protocol != null && this.getChannel() != null) {
            return this.protocol.getName() + "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)((InetSocketAddress)this.getChannel().getRemoteAddress())) + "): " + message;
        }
        return "Exception " + message;
    }

    private TajoServiceException makeTajoServiceException(RpcProtos.RpcResponse response, Throwable cause) {
        if (this.protocol != null && this.getChannel() != null) {
            return new TajoServiceException(response.getErrorMessage(), cause, this.protocol.getName(), NetUtils.normalizeInetSocketAddress((InetSocketAddress)((InetSocketAddress)this.getChannel().getRemoteAddress())));
        }
        return new TajoServiceException(response.getErrorMessage());
    }

    static class ProtoCallFuture
    implements Future<Message> {
        private Semaphore sem = new Semaphore(0);
        private Message response = null;
        private Message returnType;
        private RpcController controller;
        private ExecutionException ee;

        public ProtoCallFuture(RpcController controller, Message message) {
            this.controller = controller;
            this.returnType = message;
        }

        @Override
        public boolean cancel(boolean arg0) {
            return false;
        }

        @Override
        public Message get() throws InterruptedException, ExecutionException {
            this.sem.acquire();
            if (this.ee != null) {
                throw this.ee;
            }
            return this.response;
        }

        @Override
        public Message get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (this.sem.tryAcquire(timeout, unit)) {
                return this.response;
            }
            throw new TimeoutException();
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return this.sem.availablePermits() > 0;
        }

        public void setResponse(Message response) {
            this.response = response;
            this.sem.release();
        }

        public void setFailed(String errorText, Throwable t) {
            if (this.controller != null) {
                this.controller.setFailed(errorText);
            }
            this.ee = new ExecutionException(errorText, t);
            this.sem.release();
        }
    }

    private class ClientChannelUpstreamHandler
    extends SimpleChannelUpstreamHandler {
        private ClientChannelUpstreamHandler() {
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            RpcProtos.RpcResponse rpcResponse = (RpcProtos.RpcResponse)e.getMessage();
            ProtoCallFuture callback = (ProtoCallFuture)BlockingRpcClient.this.requests.remove(rpcResponse.getId());
            if (callback == null) {
                LOG.warn((Object)"Dangling rpc call");
            } else {
                if (rpcResponse.hasErrorMessage()) {
                    callback.setFailed(rpcResponse.getErrorMessage(), (Throwable)((Object)BlockingRpcClient.this.makeTajoServiceException(rpcResponse, (Throwable)new ServiceException(rpcResponse.getErrorTrace()))));
                    throw new RemoteException(BlockingRpcClient.this.getErrorMessage(rpcResponse.getErrorMessage()));
                }
                Message responseMessage = !rpcResponse.hasResponseMessage() ? null : callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()).build();
                callback.setResponse(responseMessage);
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            e.getChannel().close();
            for (ProtoCallFuture callback : BlockingRpcClient.this.requests.values()) {
                callback.setFailed(e.getCause().getMessage(), e.getCause());
            }
            if (LOG.isDebugEnabled()) {
                LOG.error((Object)("" + e.getCause().getMessage()), e.getCause());
            } else {
                LOG.error((Object)("RPC Exception:" + e.getCause().getMessage()));
            }
        }
    }

    private class ProxyRpcChannel
    implements BlockingRpcChannel {
        private final ClientChannelUpstreamHandler handler;

        public ProxyRpcChannel() {
            this.handler = (ClientChannelUpstreamHandler)BlockingRpcClient.this.getChannel().getPipeline().get(ClientChannelUpstreamHandler.class);
            if (this.handler == null) {
                throw new IllegalArgumentException("Channel does not have proper handler");
            }
        }

        public Message callBlockingMethod(Descriptors.MethodDescriptor method, RpcController controller, Message param, Message responsePrototype) throws TajoServiceException {
            int nextSeqId = BlockingRpcClient.this.sequence.getAndIncrement();
            Message rpcRequest = this.buildRequest(nextSeqId, method, param);
            ProtoCallFuture callFuture = new ProtoCallFuture(controller, responsePrototype);
            BlockingRpcClient.this.requests.put(nextSeqId, callFuture);
            BlockingRpcClient.this.getChannel().write((Object)rpcRequest);
            try {
                return callFuture.get();
            }
            catch (Throwable t) {
                Throwable cause;
                if (t instanceof ExecutionException && (cause = t.getCause()) != null && cause instanceof TajoServiceException) {
                    throw (TajoServiceException)((Object)cause);
                }
                throw new TajoServiceException(t.getMessage());
            }
        }

        private Message buildRequest(int seqId, Descriptors.MethodDescriptor method, Message param) {
            RpcProtos.RpcRequest.Builder requestBuilder = RpcProtos.RpcRequest.newBuilder().setId(seqId).setMethodName(method.getName());
            if (param != null) {
                requestBuilder.setRequestMessage(param.toByteString());
            }
            return requestBuilder.build();
        }
    }
}

